diff --git a/caimira/models.py b/caimira/models.py index 9ef7eece..4a80307e 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -1494,9 +1494,17 @@ def __post_init__(self): c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))): raise ValueError("If the diameter is an array, none of the ventilation parameters " "or virus decay constant can be arrays at the same time.") - if not isinstance(self.exposed.number, int): - raise NotImplementedError("Cannot use dynamic occupancy for" - " the exposed population") + + @method_cache + def population_state_change_times(self) -> typing.List[float]: + """ + All time dependent population entities on this model must provide information + about the times at which their state changes. + """ + state_change_times = set(self.concentration_model.infected.presence_interval().transition_times()) + state_change_times.update(self.exposed.presence_interval().transition_times()) + + return sorted(state_change_times) def long_range_fraction_deposited(self) -> _VectorisedFloat: """ @@ -1629,29 +1637,40 @@ def deposited_exposure_between_bounds(self, time1: float, time2: float) -> _Vect deposited_exposure += self.long_range_deposited_exposure_between_bounds(time1, time2) return deposited_exposure + + def _deposited_exposure_list(self): + """ + The number of virus per m^3 deposited on the respiratory tract. + """ + population_change_times = self.population_state_change_times() + deposited_exposure = [] + for start, stop in zip(population_change_times[:-1], population_change_times[1:]): + deposited_exposure.append(self.deposited_exposure_between_bounds(start, stop)) + + return deposited_exposure + def deposited_exposure(self) -> _VectorisedFloat: """ The number of virus per m^3 deposited on the respiratory tract. """ - deposited_exposure: _VectorisedFloat = 0.0 - for start, stop in self.exposed.presence_interval().boundaries(): - deposited_exposure += self.deposited_exposure_between_bounds(start, stop) - - return deposited_exposure * self.repeats - - @method_cache - def infection_probability(self) -> _VectorisedFloat: + return np.sum(self._deposited_exposure_list(), axis=0) * self.repeats + + def _infection_probability_list(self): # Viral dose (vD) - vD = self.deposited_exposure() + vD_list = self._deposited_exposure_list() # oneoverln2 multiplied by ID_50 corresponds to ID_63. infectious_dose = oneoverln2 * self.concentration_model.virus.infectious_dose - # Probability of infection. - return (1 - np.exp(-((vD * (1 - self.exposed.host_immunity))/(infectious_dose * - self.concentration_model.virus.transmissibility_factor)))) * 100 - + # Probability of infection. + return [(1 - np.exp(-((vD * (1 - self.exposed.host_immunity))/(infectious_dose * + self.concentration_model.virus.transmissibility_factor)))) for vD in vD_list] + + @method_cache + def infection_probability(self) -> _VectorisedFloat: + return (1 - np.prod([1 - prob for prob in self._infection_probability_list()], axis = 0)) * 100 + def total_probability_rule(self) -> _VectorisedFloat: if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or isinstance(self.exposed.number, IntPiecewiseConstant)): @@ -1682,15 +1701,12 @@ def total_probability_rule(self) -> _VectorisedFloat: return 0 def expected_new_cases(self) -> _VectorisedFloat: - # Create an equivalent exposure model without short-range interactions, if any. - if (len(self.short_range) == 0): - exposure_model = nested_replace(self, {'short_range': ()}) - prob = exposure_model.infection_probability() - else: - prob = self.infection_probability() + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or + isinstance(self.exposed.number, IntPiecewiseConstant)): + raise NotImplementedError("Cannot compute expected new cases " + "with dynamic occupancy") - exposed_occupants = self.exposed.number - return prob * exposed_occupants / 100 + return self.infection_probability() * self.exposed.number / 100 def reproduction_number(self) -> _VectorisedFloat: """ @@ -1698,13 +1714,19 @@ def reproduction_number(self) -> _VectorisedFloat: cases directly generated by one infected case in a population. """ + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or + isinstance(self.exposed.number, IntPiecewiseConstant)): + raise NotImplementedError("Cannot compute reproduction number " + "with dynamic occupancy") + if self.concentration_model.infected.number == 1: return self.expected_new_cases() # Create an equivalent exposure model but with precisely # one infected case. single_exposure_model = nested_replace( - self, {'concentration_model.infected.number': 1} + self, { + 'concentration_model.infected.number': 1} ) return single_exposure_model.expected_new_cases() diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index e5bf9805..8fe6b891 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -27,7 +27,7 @@ def full_exposure_model(): short_range=(), exposed=models.Population( number=10, - presence=models.SpecificInterval(((8, 12), (13, 17), )), + presence=models.SpecificInterval(((8, 12), (13, 17), )), mask=models.Mask.types['No mask'], activity=models.Activity.types['Seated'], host_immunity=0. @@ -51,11 +51,37 @@ def baseline_infected_population_number(): @pytest.fixture -def dynamic_single_exposure_model(full_exposure_model, baseline_infected_population_number): +def baseline_exposed_population_number(): + return models.Population( + number=models.IntPiecewiseConstant( + (8, 12, 13, 17), (10, 0, 10)), + presence=None, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + host_immunity=0., + ) + + +@pytest.fixture +def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population_number): return dc_utils.nested_replace(full_exposure_model, {'concentration_model.infected': baseline_infected_population_number, }) +@pytest.fixture +def dynamic_exposed_single_exposure_model(full_exposure_model, baseline_exposed_population_number): + return dc_utils.nested_replace(full_exposure_model, + {'exposed': baseline_exposed_population_number, }) + + +@pytest.fixture +def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population_number ,baseline_exposed_population_number): + return dc_utils.nested_replace(full_exposure_model, { + 'concentration_model.infected': baseline_infected_population_number, + 'exposed': baseline_exposed_population_number, + }) + + @pytest.mark.parametrize( "time", [4., 8., 10., 12., 13., 14., 16., 20., 24.], @@ -91,16 +117,16 @@ def test_population_number(full_exposure_model: models.ExposureModel, [4., 8., 10., 12., 13., 14., 16., 20., 24.], ) def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel, - dynamic_single_exposure_model: models.ExposureModel, + dynamic_infected_single_exposure_model: models.ExposureModel, time: float): - assert full_exposure_model.concentration(time) == dynamic_single_exposure_model.concentration(time) + assert full_exposure_model.concentration(time) == dynamic_infected_single_exposure_model.concentration(time) @pytest.mark.parametrize("number_of_infected",[1, 2, 3, 4, 5]) @pytest.mark.parametrize("time",[9., 12.5, 16.]) def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureModel, - dynamic_single_exposure_model: models.ExposureModel, + dynamic_infected_single_exposure_model: models.ExposureModel, time: float, number_of_infected: int): @@ -112,14 +138,14 @@ def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureM } ) - npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected) - npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_single_exposure_model.deposited_exposure() * number_of_infected) + npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_infected_single_exposure_model.concentration(time) * number_of_infected) + npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_infected_single_exposure_model.deposited_exposure() * number_of_infected) @pytest.mark.parametrize( "time", (8., 9., 10., 11., 12., 13., 14.), ) -def test_dynamic_dose(full_exposure_model, time): +def test_dynamic_dose(full_exposure_model: models.ExposureModel, time: float): dynamic_infected: models.ExposureModel = dc_utils.nested_replace( full_exposure_model, @@ -171,10 +197,59 @@ def test_dynamic_dose(full_exposure_model, time): npt.assert_almost_equal(dynamic_exposure, np.sum(static_exposure)) -def test_dynamic_total_probability_rule(dynamic_single_exposure_model: models.ExposureModel): - with pytest.raises( - NotImplementedError, - match=re.escape("Cannot compute total probability " - "(including incidence rate) with dynamic occupancy") - ): - dynamic_single_exposure_model.total_probability_rule() +def test_infection_probability( + full_exposure_model: models.ExposureModel, + dynamic_infected_single_exposure_model: models.ExposureModel, + dynamic_exposed_single_exposure_model: models.ExposureModel, + dynamic_population_exposure_model: models.ExposureModel): + + base_infection_probability = full_exposure_model.infection_probability() + npt.assert_almost_equal(base_infection_probability, dynamic_infected_single_exposure_model.infection_probability()) + npt.assert_almost_equal(base_infection_probability, dynamic_exposed_single_exposure_model.infection_probability()) + npt.assert_almost_equal(base_infection_probability, dynamic_population_exposure_model.infection_probability()) + + +def test_dynamic_total_probability_rule( + dynamic_infected_single_exposure_model: models.ExposureModel, + dynamic_exposed_single_exposure_model: models.ExposureModel, + dynamic_population_exposure_model: models.ExposureModel): + + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " + "(including incidence rate) with dynamic occupancy")): + dynamic_infected_single_exposure_model.total_probability_rule() + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " + "(including incidence rate) with dynamic occupancy")): + dynamic_exposed_single_exposure_model.total_probability_rule() + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " + "(including incidence rate) with dynamic occupancy")): + dynamic_population_exposure_model.total_probability_rule() + +def test_dynamic_expected_new_cases( + dynamic_infected_single_exposure_model: models.ExposureModel, + dynamic_exposed_single_exposure_model: models.ExposureModel, + dynamic_population_exposure_model: models.ExposureModel): + + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " + "with dynamic occupancy")): + dynamic_infected_single_exposure_model.expected_new_cases() + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " + "with dynamic occupancy")): + dynamic_exposed_single_exposure_model.expected_new_cases() + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " + "with dynamic occupancy")): + dynamic_population_exposure_model.expected_new_cases() + +def test_dynamic_reproduction_number( + dynamic_infected_single_exposure_model: models.ExposureModel, + dynamic_exposed_single_exposure_model: models.ExposureModel, + dynamic_population_exposure_model: models.ExposureModel): + + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " + "with dynamic occupancy")): + dynamic_infected_single_exposure_model.reproduction_number() + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " + "with dynamic occupancy")): + dynamic_exposed_single_exposure_model.reproduction_number() + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " + "with dynamic occupancy")): + dynamic_population_exposure_model.reproduction_number()