From ee8ce9f4048bc5db08101e0d79d1ed19ea78a81c Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Tue, 4 Apr 2023 11:35:40 +0200 Subject: [PATCH 1/8] added expert apps validation for presence times --- caimira/apps/calculator/report_generator.py | 22 ++- caimira/apps/expert.py | 60 ++++--- caimira/apps/expert_co2.py | 20 ++- caimira/models.py | 157 +++++++++++++----- caimira/monte_carlo/models.py | 8 + .../tests/models/test_concentration_model.py | 3 +- 6 files changed, 190 insertions(+), 80 deletions(-) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index bd6d273f..6b122948 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -20,10 +20,18 @@ def model_start_end(model: models.ExposureModel): - t_start = min(model.exposed.presence.boundaries()[0][0], - model.concentration_model.infected.presence.boundaries()[0][0]) - t_end = max(model.exposed.presence.boundaries()[-1][1], - model.concentration_model.infected.presence.boundaries()[-1][1]) + if (isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval) + and isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval)): + t_start = min(model.exposed.presence.boundaries()[0][0], + model.concentration_model.infected.presence.boundaries()[0][0]) + t_end = max(model.exposed.presence.boundaries()[-1][1], + model.concentration_model.infected.presence.boundaries()[-1][1]) + elif (isinstance(model.exposed.number, models.IntPiecewiseContant) + and isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant)): + t_start = min(model.exposed.number.interval().boundaries()[0][0], + model.concentration_model.infected.number.interval().boundaries()[0][0]) + t_end = max(model.exposed.number.interval().boundaries()[-1][1], + model.concentration_model.infected.number.interval().boundaries()[-1][1]) return t_start, t_end @@ -141,11 +149,15 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing exposed_occupants = model.exposed.number expected_new_cases = np.array(model.expected_new_cases()).mean() uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(model))) if form.conditional_probability_plot else None + if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): + exposed_presence_intervals = [list(interval) for interval in model.exposed.presence.boundaries()] + elif isinstance(model.exposed.number, models.IntPiecewiseContant): + exposed_presence_intervals = [list(interval) for interval in model.exposed.number.interval().boundaries()] return { "model_repr": repr(model), "times": list(times), - "exposed_presence_intervals": [list(interval) for interval in model.exposed.presence.boundaries()], + "exposed_presence_intervals": exposed_presence_intervals, "short_range_intervals": short_range_intervals, "short_range_expirations": short_range_expirations, "concentrations": concentrations, diff --git a/caimira/apps/expert.py b/caimira/apps/expert.py index 77e7e82b..34a6a062 100644 --- a/caimira/apps/expert.py +++ b/caimira/apps/expert.py @@ -148,8 +148,12 @@ def update(self, model: models.ExposureModel): def update_plot(self, model: models.ExposureModel): resolution = 600 - ts = np.linspace(sorted(model.concentration_model.infected.presence.transition_times())[0], - sorted(model.concentration_model.infected.presence.transition_times())[-1], resolution) + if isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval): + infected_presence = model.concentration_model.infected.presence + elif isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant): + infected_presence = model.concentration_model.infected.number.interval() + ts = np.linspace(sorted(infected_presence.transition_times())[0], + sorted(infected_presence.transition_times())[-1], resolution) concentration = [model.concentration(t) for t in ts] cumulative_doses = np.cumsum([ @@ -164,16 +168,21 @@ def update_plot(self, model: models.ExposureModel): self.ax.ignore_existing_data_limits = False self.concentration_line.set_data(ts, concentration) + if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): + exposed_presence = model.exposed.presence + elif isinstance(model.exposed.number, models.IntPiecewiseContant): + exposed_presence = model.exposed.number.interval() + if self.concentration_area is None: self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) | - (model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1]))) + where = ((exposed_presence.boundaries()[0][0] < ts) & (ts < exposed_presence.boundaries()[0][1]) | + (exposed_presence.boundaries()[1][0] < ts) & (ts < exposed_presence.boundaries()[1][1]))) else: self.concentration_area.remove() self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) | - (model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1]))) + where = ((exposed_presence.boundaries()[0][0] < ts) & (ts < exposed_presence.boundaries()[0][1]) | + (exposed_presence.boundaries()[1][0] < ts) & (ts < exposed_presence.boundaries()[1][1]))) if self.cumulative_line is None: [self.cumulative_line] = self.ax2.plot(ts[:-1], cumulative_doses, color='#0000c8', linestyle='dotted') @@ -187,8 +196,8 @@ def update_plot(self, model: models.ExposureModel): cumulative_top = max(cumulative_doses) self.ax2.set_ylim(bottom=0., top=cumulative_top) - self.ax.set_xlim(left = min(min(model.concentration_model.infected.presence.boundaries()[0]), min(model.exposed.presence.boundaries()[0])), - right = max(max(model.concentration_model.infected.presence.boundaries()[1]), max(model.exposed.presence.boundaries()[1]))) + self.ax.set_xlim(left = min(min(infected_presence.boundaries()[0]), min(exposed_presence.boundaries()[0])), + right = max(max(infected_presence.boundaries()[1]), max(exposed_presence.boundaries()[1]))) figure_legends = [mlines.Line2D([], [], color='#3530fe', markersize=15, label='Mean concentration'), mlines.Line2D([], [], color='#0000c8', markersize=15, ls="dotted", label='Cumulative dose'), @@ -649,21 +658,10 @@ def on_exposed_number_change(change): number.observe(on_exposed_number_change, names=['value']) return widgets.HBox([widgets.Label('Number of exposed people in the room '), number], layout=widgets.Layout(justify_content='space-between')) - - def generate_presence_widget(self, min, max, node): - options = list(pd.date_range(min, max, freq="1min").strftime('%H:%M')) - start_hour = float(node[0]) - end_hour = float(node[1]) - start_hour_datetime = datetime.time(hour = int(start_hour), minute=int(start_hour%1*60)) - end_hour_datetime = datetime.time(hour = int(end_hour), minute=int(end_hour%1*60)) - return widgets.SelectionRangeSlider( - options=options, - index=(options.index(str(start_hour_datetime)[:-3]), options.index(str(end_hour_datetime)[:-3])), - ) def _build_exposed_presence(self, node): - presence_start = self.generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) - presence_finish = self.generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) + presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) + presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) def on_presence_start_change(change): new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) @@ -719,8 +717,8 @@ def on_viral_load_change(change): return widgets.HBox([widgets.Label("Viral load (copies/ml)"), viral_load_in_sputum], layout=widgets.Layout(justify_content='space-between')) def _build_infected_presence(self, node, ventilation_node): - presence_start = self.generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) - presence_finish = self.generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) + presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) + presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) def on_presence_start_change(change): new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) @@ -1119,6 +1117,18 @@ def models_start_end(models: typing.Sequence[models.ExposureModel]) -> typing.Tu Returns the earliest start and latest end time of a collection of ConcentrationModel objects """ - infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models) - infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models) + infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models) # type: ignore + infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models) # type: ignore return infected_start, infected_finish + + +def generate_presence_widget(min, max, node): + options = list(pd.date_range(min, max, freq="1min").strftime('%H:%M')) + start_hour = float(node[0]) + end_hour = float(node[1]) + start_hour_datetime = datetime.time(hour = int(start_hour), minute=int(start_hour%1*60)) + end_hour_datetime = datetime.time(hour = int(end_hour), minute=int(end_hour%1*60)) + return widgets.SelectionRangeSlider( + options=options, + index=(options.index(str(start_hour_datetime)[:-3]), options.index(str(end_hour_datetime)[:-3])), + ) diff --git a/caimira/apps/expert_co2.py b/caimira/apps/expert_co2.py index c78a5b10..505870d9 100644 --- a/caimira/apps/expert_co2.py +++ b/caimira/apps/expert_co2.py @@ -8,7 +8,7 @@ import matplotlib.figure import matplotlib.lines as mlines import matplotlib.patches as patches -from .expert import collapsible, ipympl_canvas, WidgetGroup, CAIMIRAStateBuilder +from .expert import generate_presence_widget, collapsible, ipympl_canvas, WidgetGroup, CAIMIRAStateBuilder baseline_model = models.CO2ConcentrationModel( @@ -368,20 +368,26 @@ def on_population_number_change(change): return widgets.HBox([widgets.Label('Number of people in the room '), number], layout=widgets.Layout(justify_content='space-between')) def _build_population_presence(self, node, ventilation_node): - presence_start = widgets.FloatRangeSlider(value = node.present_times[0], min = 8., max=13., step=0.1) - presence_finish = widgets.FloatRangeSlider(value = node.present_times[1], min = 13., max=18., step=0.1) + presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) + presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) def on_presence_start_change(change): - ventilation_node.active.start = change['new'][0] - ventilation_node.active.duration / 60 - node.present_times = (change['new'], presence_finish.value) + new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) + ventilation_node.active.start = new_value[0] - ventilation_node.active.duration / 60 + node.present_times = (new_value, node.present_times[1]) def on_presence_finish_change(change): - node.present_times = (presence_start.value, change['new']) + new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) + node.present_times = (node.present_times[0], new_value) presence_start.observe(on_presence_start_change, names=['value']) presence_finish.observe(on_presence_finish_change, names=['value']) - return widgets.HBox([widgets.Label('Population presence'), presence_start, presence_finish], layout = widgets.Layout(justify_content='space-between')) + return widgets.VBox([ + widgets.Label('Exposed presence:'), + widgets.HBox([widgets.Label('Morning:', layout=widgets.Layout(width='15%')), presence_start]), + widgets.HBox([widgets.Label('Afternoon:', layout=widgets.Layout(width='15%')), presence_finish]) + ]) def present(self): return self.widget diff --git a/caimira/models.py b/caimira/models.py index 5483596b..2698ef02 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -186,6 +186,23 @@ def refine(self, refine_factor=10) -> "PiecewiseConstant": ) +@dataclass(frozen=True) +class IntPiecewiseContant(PiecewiseConstant): + + #: values of the function between transitions + values: typing.Tuple[int, ...] + + def value(self, time) -> _VectorisedFloat: + if time <= self.transition_times[0] or time > self.transition_times[-1]: + return 0 + + for t1, t2, value in zip(self.transition_times[:-1], + self.transition_times[1:], self.values): + if t1 < time <= t2: + break + return value + + @dataclass(frozen=True) class Room: #: The total volume of the room @@ -779,7 +796,7 @@ class Population: """ #: How many in the population. - number: int + number: typing.Union[int, IntPiecewiseContant] #: The times in which the people are in the room. presence: Interval @@ -795,9 +812,46 @@ class Population: # which might render inactive some RNA copies (virions). host_immunity: float - def person_present(self, time): - return self.presence.triggered(time) - + def __post_init__(self): + if isinstance(self.number, int): + if not isinstance(self.presence, Interval): + raise TypeError(f'The presence argument must be an "Interval". Got {type(self.presence)}') + else: + if self.presence is not None: + raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number.') + + def _last_presence_state(self, time: float): + """ + Find the most recent/previous presence state change. + """ + times = set(self.number.transition_times) # type: ignore + t_index: int = np.searchsorted(sorted(times), time) # type: ignore + t_index = max([t_index - 1, 0]) + return sorted(times)[t_index] + + def person_present(self, time: float): + # Allow back-compatibility + if isinstance(self.number, int) and isinstance(self.presence, Interval): + return self.presence.triggered(time) + elif isinstance(self.number, IntPiecewiseContant): + return self.number.value(time) != 0 + + def people_present(self, time: typing.Optional[float] = None): + # Allow back-compatibility + if isinstance(self.number, int) and isinstance(self.presence, Interval): + return self.number + + elif isinstance(self.number, IntPiecewiseContant): + # For a given scenario, time falls within a break, + # we should get the previous state occupancy profile. + if time and not self.person_present(time): + if time <= self.number.transition_times[0]: + number = self.number.values[0] + else: + number = int(self.number.value(self._last_presence_state(time))) + else: + number = int(self.number.value(time)) + return number @dataclass(frozen=True) class _PopulationWithVirus(Population): @@ -818,7 +872,7 @@ def aerosols(self): """ raise NotImplementedError("Subclass must implement") - def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: + def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). @@ -828,12 +882,12 @@ def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: raise NotImplementedError("Subclass must implement") @method_cache - def emission_rate_when_present(self) -> _VectorisedFloat: + def emission_rate_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate if the infected population is present (in virions / h). """ - return (self.emission_rate_per_aerosol_when_present() * + return (self.emission_rate_per_aerosol_when_present(time) * self.aerosols()) def emission_rate(self, time) -> _VectorisedFloat: @@ -851,7 +905,7 @@ def emission_rate(self, time) -> _VectorisedFloat: # with a declaration of state change time, as is the case for things # like Ventilation. - return self.emission_rate_when_present() + return self.emission_rate_when_present(time) @property def particle(self) -> Particle: @@ -875,14 +929,14 @@ def aerosols(self): return 1. @method_cache - def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: + def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). This method includes only the diameter-independent variables within the emission rate. It should not be a function of time. """ - return self.known_individual_emission_rate * self.number + return self.known_individual_emission_rate * self.people_present(time) @dataclass(frozen=True) @@ -904,7 +958,7 @@ def aerosols(self): return self.expiration.aerosols(self.mask) @method_cache - def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: + def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). @@ -917,8 +971,11 @@ def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: ER = (self.virus.viral_load_in_sputum * self.activity.exhalation_rate * 10 ** 6) - - return ER * self.number + + # if self.people_present(time) == 0: + # return ER + # else: + return ER * self.people_present(time) @property def particle(self) -> Particle: @@ -986,7 +1043,7 @@ def min_background_concentration(self) -> _VectorisedFloat: """ return 0. - def normalization_factor(self) -> _VectorisedFloat: + def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ Normalization factor (in the same unit as the concentration). This factor is applied to the normalized concentration only @@ -1006,12 +1063,12 @@ def _normed_concentration_limit(self, time: float) -> _VectorisedFloat: dependence has been solved for. """ if not self.population.person_present(time): - return self.min_background_concentration()/self.normalization_factor() + return self.min_background_concentration()/self.normalization_factor(time) V = self.room.volume RR = self.removal_rate(time) return (1. / (RR * V) + self.min_background_concentration()/ - self.normalization_factor()) + self.normalization_factor(time)) @method_cache def state_change_times(self) -> typing.List[float]: @@ -1020,7 +1077,10 @@ def state_change_times(self) -> typing.List[float]: the times at which their state changes. """ state_change_times = {0.} - state_change_times.update(self.population.presence.transition_times()) + if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): + state_change_times.update(self.population.presence.transition_times()) + elif isinstance(self.population.number, IntPiecewiseContant): + state_change_times.update(self.population.number.interval().transition_times()) state_change_times.update(self.ventilation.transition_times(self.room)) return sorted(state_change_times) @@ -1029,7 +1089,11 @@ def _first_presence_time(self) -> float: """ First presence time. Before that, the concentration is zero. """ - return self.population.presence.boundaries()[0][0] + if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): + first_presence = self.population.presence.boundaries()[0][0] + elif isinstance(self.population.number, IntPiecewiseContant): + first_presence = self.population.number.interval().boundaries()[0][0] + return first_presence def last_state_change(self, time: float) -> float: """ @@ -1082,7 +1146,11 @@ def _normed_concentration(self, time: float) -> _VectorisedFloat: # The model always starts at t=0, but we avoid running concentration calculations # before the first presence as an optimisation. if time <= self._first_presence_time(): - return self.min_background_concentration()/self.normalization_factor() + try: + return self.min_background_concentration()/self.normalization_factor(time) + except ZeroDivisionError: + return 0. + next_state_change_time = self._next_state_change(time) RR = self.removal_rate(next_state_change_time) # If RR is 0, conc_limit does not play a role but its computation @@ -1108,7 +1176,7 @@ def concentration(self, time: float) -> _VectorisedFloat: to this method. """ return (self._normed_concentration_cached(time) * - self.normalization_factor()) + self.normalization_factor(time)) @method_cache def normed_integrated_concentration(self, start: float, stop: float) -> _VectorisedFloat: @@ -1145,7 +1213,7 @@ def integrated_concentration(self, start: float, stop: float) -> _VectorisedFloa Get the integrated concentration of viruses in the air between the times start and stop. """ return (self.normed_integrated_concentration(start, stop) * - self.normalization_factor()) + self.normalization_factor(start)) @dataclass(frozen=True) @@ -1170,9 +1238,9 @@ def population(self) -> InfectedPopulation: def virus(self) -> Virus: return self.infected.virus - def normalization_factor(self) -> _VectorisedFloat: + def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: # we normalize by the emission rate - return self.infected.emission_rate_when_present() + return self.infected.emission_rate_when_present(time) def removal_rate(self, time: float) -> _VectorisedFloat: # Equilibrium velocity of particle motion toward the floor @@ -1220,10 +1288,10 @@ def min_background_concentration(self) -> _VectorisedFloat: """ return self.CO2_atmosphere_concentration - def normalization_factor(self) -> _VectorisedFloat: + def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: # normalization by the CO2 exhaled. # CO2 concentration given in ppm, hence the 1e6 factor. - return (1e6*self.population.number + return (1e6*self.population.people_present(time) *self.population.activity.exhalation_rate *self.CO2_fraction_exhaled) @@ -1469,19 +1537,21 @@ def _long_range_normed_exposure_between_bounds(self, time1: float, time2: float) by the emission rate of the infected population """ exposure = 0. - for start, stop in self.exposed.presence.boundaries(): - if stop < time1: - continue - elif start > time2: - break - elif start <= time1 and time2<= stop: - exposure += self.concentration_model.normed_integrated_concentration(time1, time2) - elif start <= time1 and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(time1, stop) - elif time1 < start and time2 <= stop: - exposure += self.concentration_model.normed_integrated_concentration(start, time2) - elif time1 <= start and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(start, stop) + if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): + # TODO: Implement the case for the IntPiecewiseConstant + for start, stop in self.exposed.presence.boundaries(): + if stop < time1: + continue + elif start > time2: + break + elif start <= time1 and time2<= stop: + exposure += self.concentration_model.normed_integrated_concentration(time1, time2) + elif start <= time1 and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(time1, stop) + elif time1 < start and time2 <= stop: + exposure += self.concentration_model.normed_integrated_concentration(start, time2) + elif time1 <= start and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(start, stop) return exposure def concentration(self, time: float) -> _VectorisedFloat: @@ -1589,9 +1659,10 @@ def deposited_exposure(self) -> _VectorisedFloat: The number of virus per m^3 deposited on the respiratory tract. """ deposited_exposure: _VectorisedFloat = 0.0 - - for start, stop in self.exposed.presence.boundaries(): - deposited_exposure += self.deposited_exposure_between_bounds(start, stop) + if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): + # TODO: Implement the case for the IntPiecewiseConstant + for start, stop in self.exposed.presence.boundaries(): + deposited_exposure += self.deposited_exposure_between_bounds(start, stop) return deposited_exposure * self.repeats @@ -1610,7 +1681,9 @@ def total_probability_rule(self) -> _VectorisedFloat: if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): sum_probability = 0.0 # Create an equivalent exposure model but changing the number of infected cases. - total_people = self.concentration_model.infected.number + self.exposed.number + if isinstance(self.concentration_model.infected.number, int) and isinstance(self.exposed.number, int): + # TODO: Implement the case for the IntPiecewiseConstant + total_people = self.concentration_model.infected.number + self.exposed.number max_num_infected = (total_people if total_people < 10 else 10) # The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability. # To be on the safe side, a hard coded limit with a safety margin of 2x was set. diff --git a/caimira/monte_carlo/models.py b/caimira/monte_carlo/models.py index 47e8ebab..ccaafc59 100644 --- a/caimira/monte_carlo/models.py +++ b/caimira/monte_carlo/models.py @@ -74,6 +74,14 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model elif new_field.type == typing.Tuple[caimira.models.SpecificInterval, ...]: SI = getattr(sys.modules[__name__], "SpecificInterval") field_type = typing.Tuple[typing.Union[caimira.models.SpecificInterval, SI], ...] + + elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseContant]: + IPC = getattr(sys.modules[__name__], "IntPiecewiseContant") + field_type = typing.Union[caimira.models.IntPiecewiseContant, IPC] + elif new_field.type == typing.Union[caimira.models.Interval, None]: + I = getattr(sys.modules[__name__], "Interval") + field_type = typing.Union[caimira.models.Interval, I] + else: # Check that we don't need to do anything with this type. for item in new_field.type.__args__: diff --git a/caimira/tests/models/test_concentration_model.py b/caimira/tests/models/test_concentration_model.py index 568e6757..5d152b52 100644 --- a/caimira/tests/models/test_concentration_model.py +++ b/caimira/tests/models/test_concentration_model.py @@ -4,6 +4,7 @@ import numpy.testing as npt import pytest from dataclasses import dataclass +import typing from caimira import models @@ -32,7 +33,7 @@ def removal_rate(self, time: float) -> float: def min_background_concentration(self) -> float: return self.known_min_background_concentration - def normalization_factor(self) -> float: + def normalization_factor(self, time: typing.Optional[float] = None) -> float: return self.known_normalization_factor From aea8ee2c39c8ed27536cbe5d29d53b97e09caff3 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Wed, 15 Mar 2023 15:42:57 +0100 Subject: [PATCH 2/8] test for dynamic occupants --- caimira/models.py | 4 - .../tests/models/test_dynamic_population.py | 98 +++++++++++++++++++ 2 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 caimira/tests/models/test_dynamic_population.py diff --git a/caimira/models.py b/caimira/models.py index 2698ef02..71fbdec3 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -971,10 +971,6 @@ def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = ER = (self.virus.viral_load_in_sputum * self.activity.exhalation_rate * 10 ** 6) - - # if self.people_present(time) == 0: - # return ER - # else: return ER * self.people_present(time) @property diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py new file mode 100644 index 00000000..80808524 --- /dev/null +++ b/caimira/tests/models/test_dynamic_population.py @@ -0,0 +1,98 @@ +import numpy as np +import numpy.testing as npt +import pytest + +from caimira import models +import caimira.dataclass_utils as dc_utils + +@pytest.fixture +def full_exposure_model(): + return models.ExposureModel( + concentration_model=models.ConcentrationModel( + room=models.Room(volume=100), + ventilation=models.AirChange( + active=models.PeriodicInterval(120, 120), air_exch=0.25), + infected=models.InfectedPopulation( + number=1, + presence=models.SpecificInterval(((8, 12), (13, 17), )), + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + expiration=models.Expiration.types['Breathing'], + virus=models.Virus.types['SARS_CoV_2'], + host_immunity=0. + ), + ), + short_range=(), + exposed=models.Population( + number=10, + presence=models.SpecificInterval(((8, 12), (13, 17), )), + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + host_immunity=0. + ), + geographical_data=(), + ) + + +@pytest.fixture +def baseline_infected_population_number(): + return models.InfectedPopulation( + number=models.IntPiecewiseContant( + (8, 12, 13, 17), (1, 0, 1)), + presence=None, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + virus=models.Virus.types['SARS_CoV_2'], + expiration=models.Expiration.types['Breathing'], + host_immunity=0., + ) + + +@pytest.fixture +def baseline_exposed_population_number(): + return models.Population( + number=models.IntPiecewiseContant( + (8, 12, 13, 17), (1, 0, 1)), + presence=None, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + host_immunity=0., + ) + +@pytest.mark.parametrize( + "time", + [4., 8., 10., 12., 13., 14., 16., 20., 24.], +) +def test_population_number(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation, time: float): + + int_population_number: models.InfectedPopulation = full_exposure_model.concentration_model.infected + assert isinstance(int_population_number.number, int) + assert isinstance(int_population_number.presence, models.Interval) + + piecewise_population_number: models.InfectedPopulation = baseline_infected_population_number + assert isinstance(piecewise_population_number.number, + models.IntPiecewiseContant) + assert piecewise_population_number.presence is None + + assert int_population_number.person_present(time) == piecewise_population_number.person_present(time) + assert int_population_number.people_present(time) == piecewise_population_number.people_present(time) + + +@pytest.mark.parametrize( + "time", + [4., 8., 10., 12., 13., 14., 16., 20., 24.], +) +def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation, + baseline_exposed_population_number: models.Population, + time: float): + + dynamic_model: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected': baseline_infected_population_number, + 'exposed': baseline_exposed_population_number, + } + ) + assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) From 392b3d04212a2c7fcff2a1a50662e3a74b7f7cf6 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Thu, 16 Mar 2023 16:07:35 +0100 Subject: [PATCH 3/8] added verifications for unexpected jumps --- caimira/models.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/caimira/models.py b/caimira/models.py index 71fbdec3..8fa1a7d4 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -847,6 +847,8 @@ def people_present(self, time: typing.Optional[float] = None): if time and not self.person_present(time): if time <= self.number.transition_times[0]: number = self.number.values[0] + elif time >= self.number.transition_times[-1]: + number = self.number.values[-1] else: number = int(self.number.value(self._last_presence_state(time))) else: @@ -904,7 +906,6 @@ def emission_rate(self, time) -> _VectorisedFloat: # itself a function of time. Any change in rate must be accompanied # with a declaration of state change time, as is the case for things # like Ventilation. - return self.emission_rate_when_present(time) @property @@ -1142,12 +1143,10 @@ def _normed_concentration(self, time: float) -> _VectorisedFloat: # The model always starts at t=0, but we avoid running concentration calculations # before the first presence as an optimisation. if time <= self._first_presence_time(): - try: - return self.min_background_concentration()/self.normalization_factor(time) - except ZeroDivisionError: - return 0. - + return self.min_background_concentration()/self.normalization_factor(time) + next_state_change_time = self._next_state_change(time) + RR = self.removal_rate(next_state_change_time) # If RR is 0, conc_limit does not play a role but its computation # would raise an error -> we set it to zero. @@ -1157,11 +1156,12 @@ def _normed_concentration(self, time: float) -> _VectorisedFloat: conc_limit = 0. t_last_state_change = self.last_state_change(time) - conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change) + conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change)*self.normalization_factor(t_last_state_change) delta_time = time - t_last_state_change fac = np.exp(-RR * delta_time) - return conc_limit * (1 - fac) + conc_at_last_state_change * fac + + return conc_limit * (1 - fac) + conc_at_last_state_change/self.normalization_factor(time) * fac def concentration(self, time: float) -> _VectorisedFloat: """ From f3f8d459397efe02e81cf4bc4107e8627b79c77e Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Wed, 29 Mar 2023 12:04:36 +0200 Subject: [PATCH 4/8] added test for dynamic and static population --- .../tests/models/test_dynamic_population.py | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index 80808524..b1c139e9 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -96,3 +96,83 @@ def test_concentration_model_dynamic_population(full_exposure_model: models.Expo } ) assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) + + +@pytest.mark.parametrize( + "time, number_of_infected", + [ + [8., 1], + [10., 2], + [12., 3], + [13., 4], + [15., 5], + ]) +def test_sum_of_models(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation, + time: float, + number_of_infected: int): + + + static_multiple_exposure_model: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': number_of_infected, + } + ) + + dynamic_single_exposure_model: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected': baseline_infected_population_number, + } + ) + + npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected) + + +def test_dynamic_dose(full_exposure_model): + + dynamic_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected': models.InfectedPopulation( + number=models.IntPiecewiseContant( + (8, 10, 12, 13, 17), (1, 2, 0, 3)), + presence=None, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + virus=models.Virus.types['SARS_CoV_2'], + expiration=models.Expiration.types['Breathing'], + host_immunity=0., + ), + } + ) + + single_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': 1, + 'concentration_model.infected.presence': models.SpecificInterval(((8, 10), )), + } + ) + + two_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': 2, + 'concentration_model.infected.presence': models.SpecificInterval(((10, 12), )), + } + ) + + three_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': 3, + 'concentration_model.infected.presence': models.SpecificInterval(((13, 17), )), + } + ) + + dynamic_exposure = dynamic_infected.deposited_exposure() + static_exposure = np.sum([model.deposited_exposure() for model in (single_infected, two_infected, three_infected)]) + + npt.assert_almost_equal(dynamic_exposure, static_exposure) \ No newline at end of file From 5481bffffefcbebc2f6d44115817eca1779c5098 Mon Sep 17 00:00:00 2001 From: Nicolas Mounet Date: Thu, 6 Apr 2023 10:22:53 +0200 Subject: [PATCH 5/8] Change all normalization factors and emission rates (when present) into 'per person' quantities (i.e. do not normalize anymore by the number of infected people) - to allow dynamic number of infected people. Changing the test accordingly, and the documentation. Adding a __post_init__ check in ExposureModel to forbid the use of dynamic number of exposed people, for now. --- caimira/apps/calculator/report_generator.py | 6 +- caimira/apps/expert.py | 2 +- caimira/docs/full_diameter_dependence.rst | 26 +-- caimira/models.py | 150 ++++++++---------- .../tests/models/test_concentration_model.py | 6 +- .../tests/models/test_dynamic_population.py | 2 +- caimira/tests/models/test_exposure_model.py | 7 +- caimira/tests/test_monte_carlo_full_models.py | 12 +- 8 files changed, 98 insertions(+), 113 deletions(-) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index 6b122948..0630c1cb 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -145,7 +145,7 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing prob = np.array(model.infection_probability()) prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() - er = np.array(model.concentration_model.infected.emission_rate_when_present()).mean() + er = np.array(model.concentration_model.infected.emission_rate_per_person_when_present()).mean() exposed_occupants = model.exposed.number expected_new_cases = np.array(model.expected_new_cases()).mean() uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(model))) if form.conditional_probability_plot else None @@ -166,12 +166,12 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing "cumulative_doses": list(cumulative_doses), "long_range_cumulative_doses": list(long_range_cumulative_doses), "prob_inf": prob.mean(), - "prob_inf_sd": np.std(prob), + "prob_inf_sd": prob.std(), "prob_dist": list(prob), "prob_hist_count": list(prob_dist_count), "prob_hist_bins": list(prob_dist_bins), "prob_probabilistic_exposure": prob_probabilistic_exposure, - "emission_rate": er, + "emission_rate_per_person": er, "exposed_occupants": exposed_occupants, "expected_new_cases": expected_new_cases, "uncertainties_plot_src": uncertainties_plot_src, diff --git a/caimira/apps/expert.py b/caimira/apps/expert.py index 34a6a062..d32b1dca 100644 --- a/caimira/apps/expert.py +++ b/caimira/apps/expert.py @@ -209,7 +209,7 @@ def update_plot(self, model: models.ExposureModel): def update_textual_result(self, model: models.ExposureModel): lines = [] P = np.array(model.infection_probability()).mean() - lines.append(f'Emission rate (virus/hr): {np.round(model.concentration_model.infected.emission_rate_when_present(),0)}') + lines.append(f'Emission rate per infected person (virus/hr): {np.round(model.concentration_model.infected.emission_rate_per_person_when_present(),0)}') lines.append(f'Probability of infection: {np.round(P, 0)}%') lines.append(f'Number of exposed: {model.exposed.number}') diff --git a/caimira/docs/full_diameter_dependence.rst b/caimira/docs/full_diameter_dependence.rst index 1e684ebb..219eb78f 100644 --- a/caimira/docs/full_diameter_dependence.rst +++ b/caimira/docs/full_diameter_dependence.rst @@ -26,7 +26,7 @@ provided the sample size is large enough. Example of the MC integration over the It is important to distinguish between 1) Monte-Carlo random variables (which are vectorised independently on its diameter-dependence) and 2) numerical Monte-Carlo integration for the diameter-dependence. Since the integral of the diameter-dependent variables are solved when computing the dose -- :math:`\mathrm{vD^{total}}` -- while performing some of the intermediate calculations, -we normalize the results by *dividing* by the Monte-Carlo variables that are diameter-independent, so that they are not considered in the Monte-Carlo integration (e.g. the **viral load** parameter, or the result of the :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_when_present` method). +we normalize the results by *dividing* by the Monte-Carlo variables that are diameter-independent, so that they are not considered in the Monte-Carlo integration (e.g. the **viral load** parameter, or the result of the :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_per_person_when_present` method). Expiration ========== @@ -62,9 +62,9 @@ Note that :math:`D_{\mathrm{max}}` value will differ, depending on the type of e In the code, for a given Expiration, we use different methods to perform the calculations *step-by-step*: -1. Calculate the non aerosol-dependent quantities in the emission rate, which is the multiplication of the diameter-**independent** variables: :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_when_present`. This corresponds to the :math:`\mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` part of the :math:`\mathrm{vR}(D)` equation. +1. Calculate the non aerosol-dependent quantities in the emission rate per person infected, which is the multiplication of the diameter-**independent** variables: :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_per_person_when_present`. This corresponds to the :math:`\mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` part of the :math:`\mathrm{vR}(D)` equation. 2. Calculate the diameter-**dependent** variable :meth:`caimira.models.InfectedPopulation.aerosols`, which is the result of :math:`E_{c,j}(D) = N_p(D) \cdot V_p(D) \cdot (1 − η_\mathrm{out}(D))` (in mL/(m\ :sup:`3` \.µm)), with :math:`N_p(D)` being the product of the BLO distribution by the scaling factor :math:`cn`. Note that this result is not integrated over the diameters at this stage, thus the units are still *'per aerosol diameter'*. -3. Calculate the full emission rate, which is the multiplication of the two previous methods, and corresponds to :math:`\mathrm{vR(D)}`: :meth:`caimira.models._PopulationWithVirus.emission_rate_when_present`. +3. Calculate the full emission rate (per person infected), which is the multiplication of the two previous methods, and corresponds to :math:`\mathrm{vR(D)}`: :meth:`caimira.models._PopulationWithVirus.emission_rate_per_person_when_present`. Note that the diameter-dependence is kept at this stage. Since other parameters downstream in code are also diameter-dependent, the Monte-Carlo integration over the aerosol sizes is computed at the level of the dose :math:`\mathrm{vD^{total}}`. In case one would like to have intermediate results for emission rate, perform the Monte-Carlo integration of :math:`E_{c, j}^{\mathrm{total}}` and compute :math:`\mathrm{vR^{total}} =\mathrm{vl_{in}} \cdot E_{c, j}^{\mathrm{total}} \cdot \mathrm{BR_k}`. @@ -97,8 +97,8 @@ the code computes first a normalized version of the concentration, i.e. divided To summarize, we can split the concentration in two different formulations: -* Normalized concentration :meth:`caimira.models._ConcentrationModelBase._normed_concentration`: :math:`\mathrm{C_\mathrm{LR, normed}}(t, D)` that computes the concentration without including the emission rate. -* Concentration :meth:`caimira.models._ConcentrationModelBase.concentration` : :math:`C_{\mathrm{LR}}(t, D) = \mathrm{C_\mathrm{LR, normed}}(t, D) \cdot \mathrm{vR}(D)`, where :math:`\mathrm{vR}(D)` is the result of the :meth:`caimira.models._PopulationWithVirus.emission_rate_when_present` method. +* Normalized concentration :meth:`caimira.models._ConcentrationModelBase._normed_concentration`: :math:`\mathrm{C_\mathrm{LR, normed}}(t, D)` that computes the concentration without including the emission rate per person infected. +* Concentration :meth:`caimira.models._ConcentrationModelBase.concentration` : :math:`C_{\mathrm{LR}}(t, D) = \mathrm{C_\mathrm{LR, normed}}(t, D) \cdot \mathrm{vR}(D)`, where :math:`\mathrm{vR}(D)` is the result of the :meth:`caimira.models._PopulationWithVirus.emission_rate_per_person_when_present` method. Note that in order to get the total concentration value in this stage, the final result should be averaged over the particle diameters (i.e. Monte-Carlo integration over diameters, see above). For the calculator app report, the total concentration (MC integral over the diameter) is performed only when generating the plot. @@ -106,8 +106,8 @@ Otherwise, the diameter-dependence continues until we compute the inhaled dose i The following methods calculate the integrated concentration between two times. They are mostly used when calculating the **dose**: -* :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`, :math:`\mathrm{C_\mathrm{normed}}(D)` that returns the integrated long-range concentration of viruses in the air, between any two times, normalized by the emission rate. Note that this method performs the integral between any two times of the previously mentioned :meth:`caimira.models._ConcentrationModelBase._normed_concentration` method. -* :meth:`caimira.models._ConcentrationModelBase.integrated_concentration`, :math:`C(D)`, that returns the same result as the previous one, but multiplied by the emission rate. +* :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`, :math:`\mathrm{C_\mathrm{normed}}(D)` that returns the integrated long-range concentration of viruses in the air, between any two times, normalized by the emission rate per person infected. Note that this method performs the integral between any two times of the previously mentioned :meth:`caimira.models._ConcentrationModelBase._normed_concentration` method. +* :meth:`caimira.models._ConcentrationModelBase.integrated_concentration`, :math:`C(D)`, that returns the same result as the previous one, but multiplied by the emission rate (per person infected). The integral over the exposure times is calculated directly in the class (integrated methods). @@ -221,14 +221,14 @@ Long-range approach ******************* Regarding the concentration part of the long-range exposure (concentration integrated over time, :math:`\int_{t1}^{t2}C_{\mathrm{LR}}(t, D)\;\mathrm{d}t`), the respective method is :meth:`caimira.models.ExposureModel._long_range_normed_exposure_between_bounds`, -which uses the long-range exposure (concentration) between two bounds (time1 and time2), normalized by the emission rate of the infected population, calculated from :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`. +which uses the long-range exposure (concentration) between two bounds (time1 and time2), normalized by the emission rate of the infected population (per person infected), calculated from :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`. The former method filters out the given bounds considering the breaks through the day (i.e. the time intervals during which there is no exposition to the virus) and retrieves the integrated long-range concentration of viruses in the air between any two times. After the calculations of the integrated concentration over the time, in order to calculate the final dose, we have to compute the remaining factors in the above equation. Note that the **Monte-Carlo integration over the diameters is performed at this stage**, where all the diameter-dependent parameters are grouped together to calculate the final average (:code:`np.mean()`). -Since, in the previous chapters, the quantities where normalised by the emission rate, one will need to re-incorporate it in the equations before performing the MC integrations over :math:`D`. -For that we need to split :math:`\mathrm{vR}(D)` (:meth:`caimira.models._PopulationWithVirus.emission_rate_when_present`) in diameter-dependent and diameter-independent quantities: +Since, in the previous chapters, the quantities where normalised by the emission rate per person infected, one will need to re-incorporate it in the equations before performing the MC integrations over :math:`D`. +For that we need to split :math:`\mathrm{vR}(D)` (:meth:`caimira.models._PopulationWithVirus.emission_rate_per_person_when_present`) in diameter-dependent and diameter-independent quantities: :math:`\mathrm{vR}(D) = \mathrm{vR}(D-\mathrm{dependent}) \times \mathrm{vR}(D-\mathrm{independent})` @@ -236,7 +236,7 @@ with :math:`\mathrm{vR}(D-\mathrm{dependent}) = \mathrm{cn} \cdot V_p(D) \cdot (1 − \mathrm{η_{out}}(D))` - :meth:`caimira.models.InfectedPopulation.aerosols` -:math:`\mathrm{vR}(D-\mathrm{independent}) = \mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` - :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_when_present` +:math:`\mathrm{vR}(D-\mathrm{independent}) = \mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` - :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_per_person_when_present` In other words, in the code the procedure is the following (all performed in :meth:`caimira.models.ExposureModel.long_range_deposited_exposure_between_bounds` method): @@ -248,7 +248,7 @@ In other words, in the code the procedure is the following (all performed in :me * in order to complete the equation, multiply by the remaining diameter-independent variables in :math:`\mathrm{vD}` to obtain the total value: :math:`\mathrm{vD^{total}} = \mathrm{vD_{emission-rate}} \cdot \mathrm{BR}_{\mathrm{k}} \cdot (1-\eta_{\mathrm{in}}) \cdot f_{\mathrm{inf}}`; * in the end, the dose is a vectorized float used in the probability of infection formula. -**Note**: The aerosol volume concentration (*aerosols*) is introduced because the integrated concentration over the time was previously normalized by the emission rate. +**Note**: The aerosol volume concentration (*aerosols*) is introduced because the integrated concentration over the time was previously normalized by the emission rate (per person). Here, to calculate the integral over the diameters we also need to consider the diameter-dependent variables that are on the emission rate, represented by the aerosol volume concentration which depends on the diameter and on the mask type: :math:`\mathrm{aerosols} = \mathrm{cn} \cdot V_p(D) \cdot (1 − \mathrm{η_{out}}(D))` . @@ -339,4 +339,4 @@ REFERENCES ========== .. [1] Jia, Wei, et al. "Exposure and respiratory infection risk via the short-range airborne route." Building and environment 219 (2022): 109166. `doi.org/10.1016/j.buildenv.2022.109166 `_ -.. [2] Henriques, Andre, et al. "Modelling airborne transmission of SARS-CoV-2 using CARA: risk assessment for enclosed spaces." Interface Focus 12.2 (2022): 20210076. `doi.org/10.1098/rsfs.2021.0076 `_ \ No newline at end of file +.. [2] Henriques, Andre, et al. "Modelling airborne transmission of SARS-CoV-2 using CARA: risk assessment for enclosed spaces." Interface Focus 12.2 (2022): 20210076. `doi.org/10.1098/rsfs.2021.0076 `_ diff --git a/caimira/models.py b/caimira/models.py index 8fa1a7d4..38924f8c 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -819,15 +819,6 @@ def __post_init__(self): else: if self.presence is not None: raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number.') - - def _last_presence_state(self, time: float): - """ - Find the most recent/previous presence state change. - """ - times = set(self.number.transition_times) # type: ignore - t_index: int = np.searchsorted(sorted(times), time) # type: ignore - t_index = max([t_index - 1, 0]) - return sorted(times)[t_index] def person_present(self, time: float): # Allow back-compatibility @@ -835,25 +826,15 @@ def person_present(self, time: float): return self.presence.triggered(time) elif isinstance(self.number, IntPiecewiseContant): return self.number.value(time) != 0 - - def people_present(self, time: typing.Optional[float] = None): + + def people_present(self, time: float): # Allow back-compatibility - if isinstance(self.number, int) and isinstance(self.presence, Interval): - return self.number - - elif isinstance(self.number, IntPiecewiseContant): - # For a given scenario, time falls within a break, - # we should get the previous state occupancy profile. - if time and not self.person_present(time): - if time <= self.number.transition_times[0]: - number = self.number.values[0] - elif time >= self.number.transition_times[-1]: - number = self.number.values[-1] - else: - number = int(self.number.value(self._last_presence_state(time))) - else: - number = int(self.number.value(time)) - return number + if isinstance(self.number, int): + return self.number * self.person_present(time) + + else: + return int(self.number.value(time)) + @dataclass(frozen=True) class _PopulationWithVirus(Population): @@ -874,22 +855,22 @@ def aerosols(self): """ raise NotImplementedError("Subclass must implement") - def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_aerosol_per_person_when_present(self) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, - if the infected population is present, in (virion.cm^3)/(mL.h). + per person, if the infected population is present, in (virion.cm^3)/(mL.h). This method includes only the diameter-independent variables within the emission rate. It should not be a function of time. """ raise NotImplementedError("Subclass must implement") @method_cache - def emission_rate_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_person_when_present(self) -> _VectorisedFloat: """ - The emission rate if the infected population is present + The emission rate if the infected population is present, per person (in virions / h). """ - return (self.emission_rate_per_aerosol_when_present(time) * + return (self.emission_rate_per_aerosol_per_person_when_present() * self.aerosols()) def emission_rate(self, time) -> _VectorisedFloat: @@ -906,7 +887,7 @@ def emission_rate(self, time) -> _VectorisedFloat: # itself a function of time. Any change in rate must be accompanied # with a declaration of state change time, as is the case for things # like Ventilation. - return self.emission_rate_when_present(time) + return self.emission_rate_per_person_when_present() * self.people_present(time) @property def particle(self) -> Particle: @@ -930,14 +911,14 @@ def aerosols(self): return 1. @method_cache - def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_aerosol_per_person_when_present(self) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, - if the infected population is present, in (virion.cm^3)/(mL.h). + per person, if the infected population is present, in (virion.cm^3)/(mL.h). This method includes only the diameter-independent variables within the emission rate. It should not be a function of time. """ - return self.known_individual_emission_rate * self.people_present(time) + return self.known_individual_emission_rate @dataclass(frozen=True) @@ -959,7 +940,7 @@ def aerosols(self): return self.expiration.aerosols(self.mask) @method_cache - def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_aerosol_per_person_when_present(self) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). @@ -972,7 +953,7 @@ def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = ER = (self.virus.viral_load_in_sputum * self.activity.exhalation_rate * 10 ** 6) - return ER * self.people_present(time) + return ER @property def particle(self) -> Particle: @@ -1040,7 +1021,7 @@ def min_background_concentration(self) -> _VectorisedFloat: """ return 0. - def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def normalization_factor(self) -> _VectorisedFloat: """ Normalization factor (in the same unit as the concentration). This factor is applied to the normalized concentration only @@ -1059,13 +1040,14 @@ def _normed_concentration_limit(self, time: float) -> _VectorisedFloat: can be put back in front of the concentration after the time dependence has been solved for. """ - if not self.population.person_present(time): - return self.min_background_concentration()/self.normalization_factor(time) + #if not self.population.person_present(time): + # return self.min_background_concentration()/self.normalization_factor() + V = self.room.volume RR = self.removal_rate(time) - return (1. / (RR * V) + self.min_background_concentration()/ - self.normalization_factor(time)) + return (self.population.people_present(time) / (RR * V) + + self.min_background_concentration()/self.normalization_factor()) @method_cache def state_change_times(self) -> typing.List[float]: @@ -1143,7 +1125,7 @@ def _normed_concentration(self, time: float) -> _VectorisedFloat: # The model always starts at t=0, but we avoid running concentration calculations # before the first presence as an optimisation. if time <= self._first_presence_time(): - return self.min_background_concentration()/self.normalization_factor(time) + return self.min_background_concentration()/self.normalization_factor() next_state_change_time = self._next_state_change(time) @@ -1156,12 +1138,12 @@ def _normed_concentration(self, time: float) -> _VectorisedFloat: conc_limit = 0. t_last_state_change = self.last_state_change(time) - conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change)*self.normalization_factor(t_last_state_change) + conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change) delta_time = time - t_last_state_change fac = np.exp(-RR * delta_time) - return conc_limit * (1 - fac) + conc_at_last_state_change/self.normalization_factor(time) * fac + return conc_limit * (1 - fac) + conc_at_last_state_change * fac def concentration(self, time: float) -> _VectorisedFloat: """ @@ -1172,7 +1154,7 @@ def concentration(self, time: float) -> _VectorisedFloat: to this method. """ return (self._normed_concentration_cached(time) * - self.normalization_factor(time)) + self.normalization_factor()) @method_cache def normed_integrated_concentration(self, start: float, stop: float) -> _VectorisedFloat: @@ -1209,7 +1191,7 @@ def integrated_concentration(self, start: float, stop: float) -> _VectorisedFloa Get the integrated concentration of viruses in the air between the times start and stop. """ return (self.normed_integrated_concentration(start, stop) * - self.normalization_factor(start)) + self.normalization_factor()) @dataclass(frozen=True) @@ -1234,9 +1216,9 @@ def population(self) -> InfectedPopulation: def virus(self) -> Virus: return self.infected.virus - def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def normalization_factor(self) -> _VectorisedFloat: # we normalize by the emission rate - return self.infected.emission_rate_when_present(time) + return self.infected.emission_rate_per_person_when_present() def removal_rate(self, time: float) -> _VectorisedFloat: # Equilibrium velocity of particle motion toward the floor @@ -1284,11 +1266,10 @@ def min_background_concentration(self) -> _VectorisedFloat: """ return self.CO2_atmosphere_concentration - def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: - # normalization by the CO2 exhaled. + def normalization_factor(self) -> _VectorisedFloat: + # normalization by the CO2 exhaled per person. # CO2 concentration given in ppm, hence the 1e6 factor. - return (1e6*self.population.people_present(time) - *self.population.activity.exhalation_rate + return (1e6*self.population.activity.exhalation_rate *self.CO2_fraction_exhaled) @@ -1515,8 +1496,11 @@ def __post_init__(self): and not ( all(np.isscalar(c_model.virus.decay_constant(c_model.room.humidity, c_model.room.inside_temp.value(time)) + c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))): - raise ValueError("If the diameter is an array, none of the ventilation parameters " - "or virus decay constant can be arrays at the same time.") + raise ValueError("If the diameter is an array, none of the ventilation parameters " + "or virus decay constant can be arrays at the same time.") + if not isinstance(self.exposed.number, int): + raise NotImplementedError("Cannot use dynamic occupancy for" + " the exposed population") def long_range_fraction_deposited(self) -> _VectorisedFloat: """ @@ -1533,21 +1517,19 @@ def _long_range_normed_exposure_between_bounds(self, time1: float, time2: float) by the emission rate of the infected population """ exposure = 0. - if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): - # TODO: Implement the case for the IntPiecewiseConstant - for start, stop in self.exposed.presence.boundaries(): - if stop < time1: - continue - elif start > time2: - break - elif start <= time1 and time2<= stop: - exposure += self.concentration_model.normed_integrated_concentration(time1, time2) - elif start <= time1 and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(time1, stop) - elif time1 < start and time2 <= stop: - exposure += self.concentration_model.normed_integrated_concentration(start, time2) - elif time1 <= start and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(start, stop) + for start, stop in self.exposed.presence.boundaries(): + if stop < time1: + continue + elif start > time2: + break + elif start <= time1 and time2<= stop: + exposure += self.concentration_model.normed_integrated_concentration(time1, time2) + elif start <= time1 and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(time1, stop) + elif time1 < start and time2 <= stop: + exposure += self.concentration_model.normed_integrated_concentration(start, time2) + elif time1 <= start and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(start, stop) return exposure def concentration(self, time: float) -> _VectorisedFloat: @@ -1565,7 +1547,8 @@ def concentration(self, time: float) -> _VectorisedFloat: def long_range_deposited_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat: deposited_exposure = 0. - emission_rate_per_aerosol = self.concentration_model.infected.emission_rate_per_aerosol_when_present() + emission_rate_per_aerosol_per_person = \ + self.concentration_model.infected.emission_rate_per_aerosol_per_person_when_present() aerosols = self.concentration_model.infected.aerosols() f_inf = self.concentration_model.infected.fraction_of_infectious_virus() fdep = self.long_range_fraction_deposited() @@ -1585,10 +1568,11 @@ def long_range_deposited_exposure_between_bounds(self, time1: float, time2: floa # one should not take any mean at this stage. dep_exposure_integrated = self._long_range_normed_exposure_between_bounds(time1, time2)*aerosols*fdep - # Then we multiply by the diameter-independent quantity emission_rate_per_aerosol, + # Then we multiply by the diameter-independent quantity emission_rate_per_aerosol_per_person, # and parameters of the vD equation (i.e. BR_k and n_in). - deposited_exposure += (dep_exposure_integrated * emission_rate_per_aerosol * - self.exposed.activity.inhalation_rate * + deposited_exposure += (dep_exposure_integrated * + emission_rate_per_aerosol_per_person * + self.exposed.activity.inhalation_rate * (1 - self.exposed.mask.inhale_efficiency())) # In the end we multiply the final results by the fraction of infectious virus of the vD equation. @@ -1655,10 +1639,8 @@ def deposited_exposure(self) -> _VectorisedFloat: The number of virus per m^3 deposited on the respiratory tract. """ deposited_exposure: _VectorisedFloat = 0.0 - if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): - # TODO: Implement the case for the IntPiecewiseConstant - for start, stop in self.exposed.presence.boundaries(): - deposited_exposure += self.deposited_exposure_between_bounds(start, stop) + for start, stop in self.exposed.presence.boundaries(): + deposited_exposure += self.deposited_exposure_between_bounds(start, stop) return deposited_exposure * self.repeats @@ -1676,11 +1658,13 @@ def infection_probability(self) -> _VectorisedFloat: def total_probability_rule(self) -> _VectorisedFloat: if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): sum_probability = 0.0 + if not isinstance(self.concentration_model.infected.number, int): + raise NotImplementedError("Cannot compute total probability " + "(including incidence rate) with dynamic occupancy") + # Create an equivalent exposure model but changing the number of infected cases. - if isinstance(self.concentration_model.infected.number, int) and isinstance(self.exposed.number, int): - # TODO: Implement the case for the IntPiecewiseConstant - total_people = self.concentration_model.infected.number + self.exposed.number - max_num_infected = (total_people if total_people < 10 else 10) + total_people = self.concentration_model.infected.number + self.exposed.number + max_num_infected = (total_people if total_people < 10 else 10) # The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability. # To be on the safe side, a hard coded limit with a safety margin of 2x was set. # Therefore we decided a hard limit of 10 infected people. diff --git a/caimira/tests/models/test_concentration_model.py b/caimira/tests/models/test_concentration_model.py index 5d152b52..f8f42106 100644 --- a/caimira/tests/models/test_concentration_model.py +++ b/caimira/tests/models/test_concentration_model.py @@ -33,7 +33,7 @@ def removal_rate(self, time: float) -> float: def min_background_concentration(self) -> float: return self.known_min_background_concentration - def normalization_factor(self, time: typing.Optional[float] = None) -> float: + def normalization_factor(self) -> float: return self.known_normalization_factor @@ -107,7 +107,7 @@ def simple_conc_model(): @pytest.fixture def dummy_population(simple_conc_model) -> models.Population: return models.Population( - number=10, + number=1, presence=simple_conc_model.infected.presence, mask=models.Mask.types['Type I'], activity=models.Activity.types['Seated'], @@ -268,7 +268,7 @@ def test_zero_ventilation_rate( ventilation = simple_conc_model.ventilation, known_population = dummy_population, known_removal_rate = known_removal_rate, - known_normalization_factor=10., + known_normalization_factor=1., known_min_background_concentration = known_min_background_concentration) normed_concentration = known_conc_model.concentration(1) diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index b1c139e9..e2737e10 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -175,4 +175,4 @@ def test_dynamic_dose(full_exposure_model): dynamic_exposure = dynamic_infected.deposited_exposure() static_exposure = np.sum([model.deposited_exposure() for model in (single_infected, two_infected, three_infected)]) - npt.assert_almost_equal(dynamic_exposure, static_exposure) \ No newline at end of file + npt.assert_almost_equal(dynamic_exposure, static_exposure) diff --git a/caimira/tests/models/test_exposure_model.py b/caimira/tests/models/test_exposure_model.py index ef9d0c4e..4c48b480 100644 --- a/caimira/tests/models/test_exposure_model.py +++ b/caimira/tests/models/test_exposure_model.py @@ -24,7 +24,7 @@ def removal_rate(self, time: float) -> models._VectorisedFloat: return 1.e50 def _normed_concentration_limit(self, time: float) -> models._VectorisedFloat: - return self.normed_concentration_function(time) + return self.normed_concentration_function(time) * self.infected.number def state_change_times(self): return [0., 24.] @@ -33,7 +33,7 @@ def _next_state_change(self, time: float): return 24. def _normed_concentration(self, time: float) -> models._VectorisedFloat: # noqa - return self.normed_concentration_function(time) + return self.normed_concentration_function(time) * self.infected.number halftime = models.PeriodicInterval(120, 60) @@ -67,7 +67,8 @@ def known_concentrations(func): expiration=models.Expiration.types['Speaking'], host_immunity=0., ) - normed_func = lambda x: func(x) / dummy_infected_population.emission_rate_when_present() + normed_func = lambda x: (func(x) / + dummy_infected_population.emission_rate_per_person_when_present()) return KnownNormedconcentration(dummy_room, dummy_ventilation, dummy_infected_population, 0.3, normed_func) diff --git a/caimira/tests/test_monte_carlo_full_models.py b/caimira/tests/test_monte_carlo_full_models.py index 9ce540e0..b6e3948c 100644 --- a/caimira/tests/test_monte_carlo_full_models.py +++ b/caimira/tests/test_monte_carlo_full_models.py @@ -313,19 +313,19 @@ def waiting_room_mc(): @retry(tries=10) @pytest.mark.parametrize( - "mc_model, expected_pi, expected_new_cases, expected_dose, expected_ER", + "mc_model, expected_pi, expected_new_cases, expected_dose, expected_ER_per_person", [ ["shared_office_mc", 5.38, 0.16, 3.350, 1056], ["classroom_mc", 8.21, 1.56, 11.356, 7416], ["ski_cabin_mc", 12.92, 0.39, 21.796, 10231], ["skagit_chorale_mc",61.01, 36.53, 84.730, 190422], ["bus_ride_mc", 10.59, 7.06, 6.650, 5419], - ["gym_mc", 0.52, 0.14, 0.249, 1450], + ["gym_mc", 0.52, 0.14, 0.249, 1450/2.], # there are two infected in this case ["waiting_room_mc", 1.53, 0.21, 0.844, 929], ] ) def test_report_models(mc_model, expected_pi, expected_new_cases, - expected_dose, expected_ER, request): + expected_dose, expected_ER_per_person, request): mc_model = request.getfixturevalue(mc_model) exposure_model = mc_model.build_model(size=SAMPLE_SIZE) npt.assert_allclose(exposure_model.infection_probability().mean(), @@ -335,8 +335,8 @@ def test_report_models(mc_model, expected_pi, expected_new_cases, npt.assert_allclose(exposure_model.deposited_exposure().mean(), expected_dose, rtol=TOLERANCE) npt.assert_allclose( - exposure_model.concentration_model.infected.emission_rate_when_present().mean(), - expected_ER, rtol=TOLERANCE) + exposure_model.concentration_model.infected.emission_rate_per_person_when_present().mean(), + expected_ER_per_person, rtol=TOLERANCE) @retry(tries=10) @@ -397,5 +397,5 @@ def test_small_shared_office_Geneva(mask_type, month, expected_pi, npt.assert_allclose(exposure_model.deposited_exposure().mean(), expected_dose, rtol=TOLERANCE) npt.assert_allclose( - exposure_model.concentration_model.infected.emission_rate_when_present().mean(), + exposure_model.concentration_model.infected.emission_rate_per_person_when_present().mean(), expected_ER, rtol=TOLERANCE) From 6e6e3d4ca66e8355789ab5bf2f94a8bae5133e46 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Thu, 6 Apr 2023 12:27:39 +0200 Subject: [PATCH 6/8] modified dynamic tests for InfectedPopulation --- caimira/models.py | 10 +-- .../tests/models/test_dynamic_population.py | 62 ++++++++++++++----- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/caimira/models.py b/caimira/models.py index 38924f8c..98e3e9a5 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -818,7 +818,7 @@ def __post_init__(self): raise TypeError(f'The presence argument must be an "Interval". Got {type(self.presence)}') else: if self.presence is not None: - raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number.') + raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number') def person_present(self, time: float): # Allow back-compatibility @@ -1656,11 +1656,13 @@ def infection_probability(self) -> _VectorisedFloat: self.concentration_model.virus.transmissibility_factor)))) * 100 def total_probability_rule(self) -> _VectorisedFloat: - if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): - sum_probability = 0.0 - if not isinstance(self.concentration_model.infected.number, int): + if (isinstance(self.concentration_model.infected.number, IntPiecewiseContant) or + isinstance(self.exposed.number, IntPiecewiseContant)): raise NotImplementedError("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") + + if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): + sum_probability = 0.0 # Create an equivalent exposure model but changing the number of infected cases. total_people = self.concentration_model.infected.number + self.exposed.number diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index e2737e10..41137c34 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -1,3 +1,5 @@ +import re + import numpy as np import numpy.testing as npt import pytest @@ -48,17 +50,6 @@ def baseline_infected_population_number(): ) -@pytest.fixture -def baseline_exposed_population_number(): - return models.Population( - number=models.IntPiecewiseContant( - (8, 12, 13, 17), (1, 0, 1)), - presence=None, - mask=models.Mask.types['No mask'], - activity=models.Activity.types['Seated'], - host_immunity=0., - ) - @pytest.mark.parametrize( "time", [4., 8., 10., 12., 13., 14., 16., 20., 24.], @@ -74,6 +65,22 @@ def test_population_number(full_exposure_model: models.ExposureModel, assert isinstance(piecewise_population_number.number, models.IntPiecewiseContant) assert piecewise_population_number.presence is None + + with pytest.raises( + TypeError, + match=f'The presence argument must be an "Interval". Got {type(None)}' + ): + dc_utils.nested_replace( + int_population_number, {'presence': None} + ) + + with pytest.raises( + TypeError, + match="The presence argument must be None for a IntPiecewiseConstant number" + ): + dc_utils.nested_replace( + piecewise_population_number, {'presence': models.SpecificInterval(((8, 12), ))} + ) assert int_population_number.person_present(time) == piecewise_population_number.person_present(time) assert int_population_number.people_present(time) == piecewise_population_number.people_present(time) @@ -85,14 +92,12 @@ def test_population_number(full_exposure_model: models.ExposureModel, ) def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel, baseline_infected_population_number: models.InfectedPopulation, - baseline_exposed_population_number: models.Population, time: float): dynamic_model: models.ExposureModel = dc_utils.nested_replace( full_exposure_model, { 'concentration_model.infected': baseline_infected_population_number, - 'exposed': baseline_exposed_population_number, } ) assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) @@ -107,7 +112,7 @@ def test_concentration_model_dynamic_population(full_exposure_model: models.Expo [13., 4], [15., 5], ]) -def test_sum_of_models(full_exposure_model: models.ExposureModel, +def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureModel, baseline_infected_population_number: models.InfectedPopulation, time: float, number_of_infected: int): @@ -128,9 +133,13 @@ def test_sum_of_models(full_exposure_model: models.ExposureModel, ) npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected) + npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_single_exposure_model.deposited_exposure() * number_of_infected) -def test_dynamic_dose(full_exposure_model): +@pytest.mark.parametrize( + "time", (8., 9., 10., 11., 12., 13., 14.), +) +def test_dynamic_dose(full_exposure_model, time): dynamic_infected: models.ExposureModel = dc_utils.nested_replace( full_exposure_model, @@ -172,7 +181,26 @@ def test_dynamic_dose(full_exposure_model): } ) + dynamic_concentration = dynamic_infected.concentration(time) dynamic_exposure = dynamic_infected.deposited_exposure() - static_exposure = np.sum([model.deposited_exposure() for model in (single_infected, two_infected, three_infected)]) - npt.assert_almost_equal(dynamic_exposure, static_exposure) + static_concentration, static_exposure = [], [] + for model in (single_infected, two_infected, three_infected): + static_concentration.append(model.concentration(time)) + static_exposure.append(model.deposited_exposure()) + + npt.assert_almost_equal(dynamic_concentration, np.sum(np.array(static_concentration))) + npt.assert_almost_equal(dynamic_exposure, np.sum(np.array(static_exposure))) + + +def test_dynamic_total_probability_rule(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation): + + model = dc_utils.nested_replace(full_exposure_model, + {'concentration_model.infected': baseline_infected_population_number }) + with pytest.raises( + NotImplementedError, + match=re.escape("Cannot compute total probability " + "(including incidence rate) with dynamic occupancy") + ): + model.total_probability_rule() From f11fee1268c5505c084cffe2ce42df0df51f9566 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Fri, 28 Apr 2023 12:05:30 +0200 Subject: [PATCH 7/8] updated branch according to review: - new presence_interval() method in the population class that avoids repetition of if conditions through the code. - typo fix. -tests were updated accordingly --- caimira/apps/calculator/report_generator.py | 21 ++---- caimira/apps/expert.py | 14 ++-- caimira/apps/expert_co2.py | 25 ++++---- caimira/models.py | 40 ++++++------ caimira/monte_carlo/models.py | 8 +-- .../tests/models/test_dynamic_population.py | 64 ++++++------------- 6 files changed, 64 insertions(+), 108 deletions(-) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index 0630c1cb..432cf286 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -20,18 +20,10 @@ def model_start_end(model: models.ExposureModel): - if (isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval) - and isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval)): - t_start = min(model.exposed.presence.boundaries()[0][0], - model.concentration_model.infected.presence.boundaries()[0][0]) - t_end = max(model.exposed.presence.boundaries()[-1][1], - model.concentration_model.infected.presence.boundaries()[-1][1]) - elif (isinstance(model.exposed.number, models.IntPiecewiseContant) - and isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant)): - t_start = min(model.exposed.number.interval().boundaries()[0][0], - model.concentration_model.infected.number.interval().boundaries()[0][0]) - t_end = max(model.exposed.number.interval().boundaries()[-1][1], - model.concentration_model.infected.number.interval().boundaries()[-1][1]) + t_start = min(model.exposed.presence_interval().boundaries()[0][0], + model.concentration_model.infected.presence_interval().boundaries()[0][0]) + t_end = max(model.exposed.presence_interval().boundaries()[-1][1], + model.concentration_model.infected.presence_interval().boundaries()[-1][1]) return t_start, t_end @@ -149,10 +141,7 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing exposed_occupants = model.exposed.number expected_new_cases = np.array(model.expected_new_cases()).mean() uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(model))) if form.conditional_probability_plot else None - if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): - exposed_presence_intervals = [list(interval) for interval in model.exposed.presence.boundaries()] - elif isinstance(model.exposed.number, models.IntPiecewiseContant): - exposed_presence_intervals = [list(interval) for interval in model.exposed.number.interval().boundaries()] + exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] return { "model_repr": repr(model), diff --git a/caimira/apps/expert.py b/caimira/apps/expert.py index d32b1dca..c15234c0 100644 --- a/caimira/apps/expert.py +++ b/caimira/apps/expert.py @@ -148,10 +148,7 @@ def update(self, model: models.ExposureModel): def update_plot(self, model: models.ExposureModel): resolution = 600 - if isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval): - infected_presence = model.concentration_model.infected.presence - elif isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant): - infected_presence = model.concentration_model.infected.number.interval() + infected_presence = model.concentration_model.infected.presence_interval() ts = np.linspace(sorted(infected_presence.transition_times())[0], sorted(infected_presence.transition_times())[-1], resolution) concentration = [model.concentration(t) for t in ts] @@ -168,10 +165,7 @@ def update_plot(self, model: models.ExposureModel): self.ax.ignore_existing_data_limits = False self.concentration_line.set_data(ts, concentration) - if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): - exposed_presence = model.exposed.presence - elif isinstance(model.exposed.number, models.IntPiecewiseContant): - exposed_presence = model.exposed.number.interval() + exposed_presence = model.exposed.presence_interval() if self.concentration_area is None: self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", @@ -1117,8 +1111,8 @@ def models_start_end(models: typing.Sequence[models.ExposureModel]) -> typing.Tu Returns the earliest start and latest end time of a collection of ConcentrationModel objects """ - infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models) # type: ignore - infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models) # type: ignore + infected_start = min(model.concentration_model.infected.presence_interval().boundaries()[0][0] for model in models) + infected_finish = min(model.concentration_model.infected.presence_interval().boundaries()[-1][1] for model in models) return infected_start, infected_finish diff --git a/caimira/apps/expert_co2.py b/caimira/apps/expert_co2.py index 505870d9..ba9d7f63 100644 --- a/caimira/apps/expert_co2.py +++ b/caimira/apps/expert_co2.py @@ -86,8 +86,8 @@ def update(self, model: models.CO2ConcentrationModel): def update_plot(self, model: models.CO2ConcentrationModel): resolution = 600 - ts = np.linspace(sorted(model.CO2_emitters.presence.transition_times())[0], - sorted(model.CO2_emitters.presence.transition_times())[-1], resolution) + ts = np.linspace(sorted(model.CO2_emitters.presence_interval().transition_times())[0], + sorted(model.CO2_emitters.presence_interval().transition_times())[-1], resolution) concentration = [model.concentration(t) for t in ts] if self.concentration_line is None: @@ -99,19 +99,19 @@ def update_plot(self, model: models.CO2ConcentrationModel): if self.concentration_area is None: self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.CO2_emitters.presence.boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[0][1]) | - (model.CO2_emitters.presence.boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[1][1]))) + where = ((model.CO2_emitters.presence_interval().boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[0][1]) | + (model.CO2_emitters.presence_interval().boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[1][1]))) else: self.concentration_area.remove() self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.CO2_emitters.presence.boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[0][1]) | - (model.CO2_emitters.presence.boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[1][1]))) + where = ((model.CO2_emitters.presence_interval().boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[0][1]) | + (model.CO2_emitters.presence_interval().boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[1][1]))) concentration_top = max(np.array(concentration)) self.ax.set_ylim(bottom=model.CO2_atmosphere_concentration * 0.9, top=concentration_top*1.1) - self.ax.set_xlim(left = min(model.CO2_emitters.presence.boundaries()[0])*0.95, - right = max(model.CO2_emitters.presence.boundaries()[1])*1.05) + self.ax.set_xlim(left = min(model.CO2_emitters.presence_interval().boundaries()[0])*0.95, + right = max(model.CO2_emitters.presence_interval().boundaries()[1])*1.05) figure_legends = [mlines.Line2D([], [], color='#3530fe', markersize=15, label='CO₂ concentration'), mlines.Line2D([], [], color='salmon', markersize=15, label='Insufficient level', linestyle='--'), @@ -122,7 +122,10 @@ def update_plot(self, model: models.CO2ConcentrationModel): self.ax.set_ylim(top=concentration_top*1.1) else: self.ax.set_ylim(top=1550) - self.ax.hlines([800, 1500], xmin=min(model.CO2_emitters.presence.boundaries()[0])*0.95, xmax=max(model.CO2_emitters.presence.boundaries()[1])*1.05, colors=['limegreen', 'salmon'], linestyles='dashed') + self.ax.hlines([800, 1500], xmin=min(model.CO2_emitters.presence_interval().boundaries()[0])*0.95, + xmax=max(model.CO2_emitters.presence_interval().boundaries()[1])*1.05, + colors=['limegreen', 'salmon'], + linestyles='dashed') self.figure.canvas.draw() @@ -788,6 +791,6 @@ def models_start_end(models: typing.Sequence[models.CO2ConcentrationModel]) -> t Returns the earliest start and latest end time of a collection of v objects """ - emitters_start = min(model.CO2_emitters.presence.boundaries()[0][0] for model in models) - emitters_finish = min(model.CO2_emitters.presence.boundaries()[-1][1] for model in models) + emitters_start = min(model.CO2_emitters.presence_interval().boundaries()[0][0] for model in models) + emitters_finish = min(model.CO2_emitters.presence_interval().boundaries()[-1][1] for model in models) return emitters_start, emitters_finish diff --git a/caimira/models.py b/caimira/models.py index 98e3e9a5..9db3fcad 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -187,7 +187,7 @@ def refine(self, refine_factor=10) -> "PiecewiseConstant": @dataclass(frozen=True) -class IntPiecewiseContant(PiecewiseConstant): +class IntPiecewiseConstant(PiecewiseConstant): #: values of the function between transitions values: typing.Tuple[int, ...] @@ -796,10 +796,10 @@ class Population: """ #: How many in the population. - number: typing.Union[int, IntPiecewiseContant] + number: typing.Union[int, IntPiecewiseConstant] #: The times in which the people are in the room. - presence: Interval + presence: typing.Union[None, Interval] #: The kind of mask being worn by the people. mask: Mask @@ -819,19 +819,24 @@ def __post_init__(self): else: if self.presence is not None: raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number') + + def presence_interval(self): + if isinstance(self.presence, Interval): + return self.presence + elif isinstance(self.number, IntPiecewiseConstant): + return self.number.interval() def person_present(self, time: float): # Allow back-compatibility if isinstance(self.number, int) and isinstance(self.presence, Interval): return self.presence.triggered(time) - elif isinstance(self.number, IntPiecewiseContant): + elif isinstance(self.number, IntPiecewiseConstant): return self.number.value(time) != 0 def people_present(self, time: float): # Allow back-compatibility if isinstance(self.number, int): return self.number * self.person_present(time) - else: return int(self.number.value(time)) @@ -1040,9 +1045,6 @@ def _normed_concentration_limit(self, time: float) -> _VectorisedFloat: can be put back in front of the concentration after the time dependence has been solved for. """ - #if not self.population.person_present(time): - # return self.min_background_concentration()/self.normalization_factor() - V = self.room.volume RR = self.removal_rate(time) @@ -1056,11 +1058,9 @@ def state_change_times(self) -> typing.List[float]: the times at which their state changes. """ state_change_times = {0.} - if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): - state_change_times.update(self.population.presence.transition_times()) - elif isinstance(self.population.number, IntPiecewiseContant): - state_change_times.update(self.population.number.interval().transition_times()) + state_change_times.update(self.population.presence_interval().transition_times()) state_change_times.update(self.ventilation.transition_times(self.room)) + return sorted(state_change_times) @method_cache @@ -1068,12 +1068,8 @@ def _first_presence_time(self) -> float: """ First presence time. Before that, the concentration is zero. """ - if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): - first_presence = self.population.presence.boundaries()[0][0] - elif isinstance(self.population.number, IntPiecewiseContant): - first_presence = self.population.number.interval().boundaries()[0][0] - return first_presence - + return self.population.presence_interval().boundaries()[0][0] + def last_state_change(self, time: float) -> float: """ Find the most recent/previous state change. @@ -1517,7 +1513,7 @@ def _long_range_normed_exposure_between_bounds(self, time1: float, time2: float) by the emission rate of the infected population """ exposure = 0. - for start, stop in self.exposed.presence.boundaries(): + for start, stop in self.exposed.presence_interval().boundaries(): if stop < time1: continue elif start > time2: @@ -1639,7 +1635,7 @@ def deposited_exposure(self) -> _VectorisedFloat: The number of virus per m^3 deposited on the respiratory tract. """ deposited_exposure: _VectorisedFloat = 0.0 - for start, stop in self.exposed.presence.boundaries(): + for start, stop in self.exposed.presence_interval().boundaries(): deposited_exposure += self.deposited_exposure_between_bounds(start, stop) return deposited_exposure * self.repeats @@ -1656,8 +1652,8 @@ def infection_probability(self) -> _VectorisedFloat: self.concentration_model.virus.transmissibility_factor)))) * 100 def total_probability_rule(self) -> _VectorisedFloat: - if (isinstance(self.concentration_model.infected.number, IntPiecewiseContant) or - isinstance(self.exposed.number, IntPiecewiseContant)): + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or + isinstance(self.exposed.number, IntPiecewiseConstant)): raise NotImplementedError("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") diff --git a/caimira/monte_carlo/models.py b/caimira/monte_carlo/models.py index ccaafc59..7215db16 100644 --- a/caimira/monte_carlo/models.py +++ b/caimira/monte_carlo/models.py @@ -75,12 +75,12 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model SI = getattr(sys.modules[__name__], "SpecificInterval") field_type = typing.Tuple[typing.Union[caimira.models.SpecificInterval, SI], ...] - elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseContant]: - IPC = getattr(sys.modules[__name__], "IntPiecewiseContant") - field_type = typing.Union[caimira.models.IntPiecewiseContant, IPC] + elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseConstant]: + IPC = getattr(sys.modules[__name__], "IntPiecewiseConstant") + field_type = typing.Union[int, caimira.models.IntPiecewiseConstant, IPC] elif new_field.type == typing.Union[caimira.models.Interval, None]: I = getattr(sys.modules[__name__], "Interval") - field_type = typing.Union[caimira.models.Interval, I] + field_type = typing.Union[None, caimira.models.Interval, I] else: # Check that we don't need to do anything with this type. diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index 41137c34..e5bf9805 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -39,7 +39,7 @@ def full_exposure_model(): @pytest.fixture def baseline_infected_population_number(): return models.InfectedPopulation( - number=models.IntPiecewiseContant( + number=models.IntPiecewiseConstant( (8, 12, 13, 17), (1, 0, 1)), presence=None, mask=models.Mask.types['No mask'], @@ -50,6 +50,12 @@ def baseline_infected_population_number(): ) +@pytest.fixture +def dynamic_single_exposure_model(full_exposure_model, baseline_infected_population_number): + return dc_utils.nested_replace(full_exposure_model, + {'concentration_model.infected': baseline_infected_population_number, }) + + @pytest.mark.parametrize( "time", [4., 8., 10., 12., 13., 14., 16., 20., 24.], @@ -58,13 +64,7 @@ def test_population_number(full_exposure_model: models.ExposureModel, baseline_infected_population_number: models.InfectedPopulation, time: float): int_population_number: models.InfectedPopulation = full_exposure_model.concentration_model.infected - assert isinstance(int_population_number.number, int) - assert isinstance(int_population_number.presence, models.Interval) - piecewise_population_number: models.InfectedPopulation = baseline_infected_population_number - assert isinstance(piecewise_population_number.number, - models.IntPiecewiseContant) - assert piecewise_population_number.presence is None with pytest.raises( TypeError, @@ -91,29 +91,16 @@ def test_population_number(full_exposure_model: models.ExposureModel, [4., 8., 10., 12., 13., 14., 16., 20., 24.], ) def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation, + dynamic_single_exposure_model: models.ExposureModel, time: float): - dynamic_model: models.ExposureModel = dc_utils.nested_replace( - full_exposure_model, - { - 'concentration_model.infected': baseline_infected_population_number, - } - ) - assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) + assert full_exposure_model.concentration(time) == dynamic_single_exposure_model.concentration(time) -@pytest.mark.parametrize( - "time, number_of_infected", - [ - [8., 1], - [10., 2], - [12., 3], - [13., 4], - [15., 5], - ]) +@pytest.mark.parametrize("number_of_infected",[1, 2, 3, 4, 5]) +@pytest.mark.parametrize("time",[9., 12.5, 16.]) def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation, + dynamic_single_exposure_model: models.ExposureModel, time: float, number_of_infected: int): @@ -125,13 +112,6 @@ def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureM } ) - dynamic_single_exposure_model: models.ExposureModel = dc_utils.nested_replace( - full_exposure_model, - { - 'concentration_model.infected': baseline_infected_population_number, - } - ) - npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected) npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_single_exposure_model.deposited_exposure() * number_of_infected) @@ -145,7 +125,7 @@ def test_dynamic_dose(full_exposure_model, time): full_exposure_model, { 'concentration_model.infected': models.InfectedPopulation( - number=models.IntPiecewiseContant( + number=models.IntPiecewiseConstant( (8, 10, 12, 13, 17), (1, 2, 0, 3)), presence=None, mask=models.Mask.types['No mask'], @@ -184,23 +164,17 @@ def test_dynamic_dose(full_exposure_model, time): dynamic_concentration = dynamic_infected.concentration(time) dynamic_exposure = dynamic_infected.deposited_exposure() - static_concentration, static_exposure = [], [] - for model in (single_infected, two_infected, three_infected): - static_concentration.append(model.concentration(time)) - static_exposure.append(model.deposited_exposure()) + static_concentration, static_exposure = zip(*[(model.concentration(time), model.deposited_exposure()) + for model in (single_infected, two_infected, three_infected)]) - npt.assert_almost_equal(dynamic_concentration, np.sum(np.array(static_concentration))) - npt.assert_almost_equal(dynamic_exposure, np.sum(np.array(static_exposure))) + npt.assert_almost_equal(dynamic_concentration, np.sum(static_concentration)) + npt.assert_almost_equal(dynamic_exposure, np.sum(static_exposure)) -def test_dynamic_total_probability_rule(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation): - - model = dc_utils.nested_replace(full_exposure_model, - {'concentration_model.infected': baseline_infected_population_number }) +def test_dynamic_total_probability_rule(dynamic_single_exposure_model: models.ExposureModel): with pytest.raises( NotImplementedError, match=re.escape("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") ): - model.total_probability_rule() + dynamic_single_exposure_model.total_probability_rule() From dea166bdddbc0026ef8caf7e7e0ccc87b90cf74e Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Wed, 3 May 2023 16:35:01 +0200 Subject: [PATCH 8/8] updated version --- caimira/apps/calculator/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caimira/apps/calculator/__init__.py b/caimira/apps/calculator/__init__.py index f8be0a45..fef9ceaf 100644 --- a/caimira/apps/calculator/__init__.py +++ b/caimira/apps/calculator/__init__.py @@ -35,7 +35,7 @@ # calculator version. If the calculator needs to make breaking changes (e.g. change # form attributes) then it can also increase its MAJOR version without needing to # increase the overall CAiMIRA version (found at ``caimira.__version__``). -__version__ = "4.8" +__version__ = "4.9" class BaseRequestHandler(RequestHandler):