Skip to content

Commit

Permalink
added methods to read and parse dynamic occupancy. Probabilistic expo…
Browse files Browse the repository at this point in the history
…sure and expected new cases were removed from results
  • Loading branch information
lrdossan committed Jul 10, 2024
1 parent ee88603 commit 09ac69b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 13 deletions.
85 changes: 74 additions & 11 deletions caimira/apps/calculator/model_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,56 @@ def generate_precise_activity_expiration(self) -> typing.Tuple[typing.Any, ...]:
respiratory_dict[respiratory_activity['type']] = respiratory_activity['percentage']

return (self.precise_activity['physical_activity'], respiratory_dict)

def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]):
### Data format validation ###
for occupancy in dynamic_occupancy:
# Check if each occupancy entry is a dictionary
if not isinstance(occupancy, typing.Dict):
raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}."')

# Check for required keys in each occupancy entry
dict_keys = list(occupancy.keys())
if "total_people" not in dict_keys:
raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys[0]}".')
else:
value = occupancy["total_people"]
# Check if the total_people value is a non-negative integer
if not isinstance(value, int):
raise ValueError(f"Total number of people should be integer. Got {value}.")
elif not value >= 0:
raise ValueError(f"Total number of people should be non-negative. Got {value}.")

if "start_time" not in dict_keys:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[1]}".')
if "finish_time" not in dict_keys:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[2]}".')

# Validate time format for start_time and finish_time
for time_key in ["start_time", "finish_time"]:
time = occupancy[time_key]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')

transition_times = []
values = []
for occupancy in dynamic_occupancy:
start_time = time_string_to_minutes(occupancy['start_time'])/60
finish_time = time_string_to_minutes(occupancy['finish_time'])/60
transition_times.extend([start_time, finish_time])
values.append(occupancy['total_people'])

unique_transition_times_sorted = np.array(sorted(set(transition_times)))

if len(values) != len(unique_transition_times_sorted) - 1:
raise ValueError("Cannot compute dynamic occupancy with the inputs provided.")

population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant(
transition_times=tuple(unique_transition_times_sorted),
values=tuple(values)
)
population_presence: typing.Union[None, models.Interval] = None
return population_occupancy, population_presence

def infected_population(self) -> mc.InfectedPopulation:
# Initializes the virus
Expand All @@ -423,18 +473,26 @@ def infected_population(self) -> mc.InfectedPopulation:
# Conversation of N people is approximately 1/N% of the time speaking.
expiration_defn = {'Speaking': 1, 'Breathing': self.total_people - 1}
elif (self.activity_type == 'precise'):
activity_defn, expiration_defn = self.generate_precise_activity_expiration()
activity_defn, expiration_defn = self.generate_precise_activity_expiration() # TODO: what to do here?

activity = activity_distributions(self.data_registry)[activity_defn]
expiration = build_expiration(self.data_registry, expiration_defn)

infected_occupants = self.infected_people
if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0:
# If dynamic occupancy is defined, the generator will parse and validate the
# respective input to a format readable by the model - IntPiecewiseConstant.
infected_occupancy, infected_presence = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
else:
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
infected_occupancy = self.infected_people
infected_presence = self.infected_present_interval()

infected = mc.InfectedPopulation(
data_registry=self.data_registry,
number=infected_occupants,
number=infected_occupancy,
presence=infected_presence,
virus=virus,
presence=self.infected_present_interval(),
mask=self.mask(),
activity=activity,
expiration=expiration,
Expand All @@ -447,11 +505,16 @@ def exposed_population(self) -> mc.Population:
if self.activity_type == 'precise'
else str(self.data_registry.population_scenario_activity[self.activity_type]['activity']))
activity = activity_distributions(self.data_registry)[activity_defn]

infected_occupants = self.infected_people
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
exposed_occupants = self.total_people - infected_occupants

if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0:
# If dynamic occupancy is defined, the generator will parse and validate the
# respective input to a format readable by the model - IntPiecewiseConstant.
exposed_occupancy, exposed_presence = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy)
else:
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
exposed_occupancy = self.total_people - self.infected_people
exposed_presence = self.exposed_present_interval()

if (self.vaccine_option):
if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'):
Expand All @@ -464,8 +527,8 @@ def exposed_population(self) -> mc.Population:
host_immunity = 0.

exposed = mc.Population(
number=exposed_occupants,
presence=self.exposed_present_interval(),
number=exposed_occupancy,
presence=exposed_presence,
activity=activity,
mask=self.mask(),
host_immunity=host_immunity,
Expand Down
13 changes: 11 additions & 2 deletions caimira/apps/calculator/report_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,17 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel, exec

prob = np.array(model.infection_probability())
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
expected_new_cases = np.array(model.expected_new_cases()).mean()

if form.exposure_option == "p_probabilistic_exposure":
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
else: prob_probabilistic_exposure = None

if ((isinstance(form.dynamic_infected_occupancy, typing.List) and len(form.dynamic_infected_occupancy) > 0) or
(isinstance(form.dynamic_exposed_occupancy, typing.List) and len(form.dynamic_exposed_occupancy) > 0)):
expected_new_cases = None
else:
expected_new_cases = np.array(model.expected_new_cases()).mean()

exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]

if (model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value # type: ignore
Expand Down

0 comments on commit 09ac69b

Please sign in to comment.