diff --git a/caimira/apps/calculator/__init__.py b/caimira/apps/calculator/__init__.py
index 452df4ef..b5a11900 100644
--- a/caimira/apps/calculator/__init__.py
+++ b/caimira/apps/calculator/__init__.py
@@ -26,7 +26,7 @@
import tornado.log
from . import markdown_tools
-from . import model_generator
+from . import model_generator, co2_model_generator
from .report_generator import ReportGenerator, calculate_report_data
from .user import AuthenticatedUser, AnonymousUser
@@ -37,7 +37,7 @@
# calculator version. If the calculator needs to make breaking changes (e.g. change
# form attributes) then it can also increase its MAJOR version without needing to
# increase the overall CAiMIRA version (found at ``caimira.__version__``).
-__version__ = "4.13.0"
+__version__ = "4.14.0"
LOG = logging.getLogger(__name__)
@@ -106,7 +106,7 @@ async def post(self) -> None:
start = datetime.datetime.now()
try:
- form = model_generator.FormData.from_dict(requested_model_config)
+ form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@@ -157,7 +157,7 @@ async def post(self) -> None:
pprint(requested_model_config)
try:
- form = model_generator.FormData.from_dict(requested_model_config)
+ form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@@ -178,7 +178,7 @@ async def post(self) -> None:
class StaticModel(BaseRequestHandler):
async def get(self) -> None:
- form = model_generator.FormData.from_dict(model_generator.baseline_raw_form_data())
+ form = model_generator.VirusFormData.from_dict(model_generator.baseline_raw_form_data())
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size'])
@@ -340,7 +340,53 @@ def get(self):
active_page=self.active_page,
text_blocks=template_environment.globals["common_text"]
))
+
+class CO2ModelResponse(BaseRequestHandler):
+ def check_xsrf_cookie(self):
+ """
+ This request handler implements a stateless API that returns report data in JSON format.
+ Thus, XSRF cookies are disabled by overriding base class implementation of this method with a pass statement.
+ """
+ pass
+
+ async def post(self, endpoint: str) -> None:
+ requested_model_config = tornado.escape.json_decode(self.request.body)
+ try:
+ form = co2_model_generator.CO2FormData.from_dict(requested_model_config)
+ except Exception as err:
+ if self.settings.get("debug", False):
+ import traceback
+ print(traceback.format_exc())
+ response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
+ self.set_status(400)
+ self.finish(json.dumps(response_json))
+ return
+
+ if endpoint.rstrip('/') == 'plot':
+ transition_times = co2_model_generator.CO2FormData.find_change_points_with_pelt(form.CO2_data)
+ self.finish({'CO2_plot': co2_model_generator.CO2FormData.generate_ventilation_plot(form.CO2_data, transition_times),
+ 'transition_times': [round(el, 2) for el in transition_times]})
+ else:
+ executor = loky.get_reusable_executor(
+ max_workers=self.settings['handler_worker_pool_size'],
+ timeout=300,
+ )
+ report_task = executor.submit(
+ co2_model_generator.CO2FormData.build_model, form,
+ )
+ report = await asyncio.wrap_future(report_task)
+
+ result = dict(report.CO2_fit_params())
+ ventilation_transition_times = report.ventilation_transition_times
+
+ result['fitting_ventilation_type'] = form.fitting_ventilation_type
+ result['transition_times'] = ventilation_transition_times
+ result['CO2_plot'] = co2_model_generator.CO2FormData.generate_ventilation_plot(CO2_data=form.CO2_data,
+ transition_times=ventilation_transition_times[:-1],
+ predictive_CO2=result['predictive_CO2'])
+ self.finish(result)
+
def get_url(app_root: str, relative_path: str = '/'):
return app_root.rstrip('/') + relative_path.rstrip('/')
@@ -363,6 +409,7 @@ def make_app(
base_urls: typing.List = [
(get_root_url(r'/?'), LandingPage),
(get_root_calculator_url(r'/?'), CalculatorForm),
+ (get_root_calculator_url(r'/co2-fit/(.*)'), CO2ModelResponse),
(get_root_calculator_url(r'/report'), ConcentrationModel),
(get_root_url(r'/static/(.*)'), StaticFileHandler, {'path': static_dir}),
(get_root_calculator_url(r'/static/(.*)'), StaticFileHandler, {'path': calculator_static_dir}),
diff --git a/caimira/apps/calculator/co2_model_generator.py b/caimira/apps/calculator/co2_model_generator.py
new file mode 100644
index 00000000..3d90fb99
--- /dev/null
+++ b/caimira/apps/calculator/co2_model_generator.py
@@ -0,0 +1,186 @@
+import dataclasses
+import logging
+import typing
+import numpy as np
+import ruptures as rpt
+import matplotlib.pyplot as plt
+import re
+
+from caimira import models
+from .form_data import FormData, cast_class_fields
+from .defaults import DEFAULT_MC_SAMPLE_SIZE, NO_DEFAULT
+from .report_generator import img2base64, _figure2bytes
+
+minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
+
+LOG = logging.getLogger(__name__)
+
+
+@dataclasses.dataclass
+class CO2FormData(FormData):
+ CO2_data: dict
+ fitting_ventilation_states: list
+ fitting_ventilation_type: str
+
+ #: The default values for undefined fields. Note that the defaults here
+ #: and the defaults in the html form must not be contradictory.
+ _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = {
+ 'CO2_data': '{}',
+ 'exposed_coffee_break_option': 'coffee_break_0',
+ 'exposed_coffee_duration': 5,
+ 'exposed_finish': '17:30',
+ 'exposed_lunch_finish': '13:30',
+ 'exposed_lunch_option': True,
+ 'exposed_lunch_start': '12:30',
+ 'exposed_start': '08:30',
+ 'fitting_ventilation_states': '[]',
+ 'fitting_ventilation_type': 'fitting_natural_ventilation',
+ 'infected_coffee_break_option': 'coffee_break_0',
+ 'infected_coffee_duration': 5,
+ 'infected_dont_have_breaks_with_exposed': False,
+ 'infected_finish': '17:30',
+ 'infected_lunch_finish': '13:30',
+ 'infected_lunch_option': True,
+ 'infected_lunch_start': '12:30',
+ 'infected_people': 1,
+ 'infected_start': '08:30',
+ 'room_volume': NO_DEFAULT,
+ 'specific_breaks': '{}',
+ 'total_people': NO_DEFAULT,
+ }
+
+ def __init__(self, **kwargs):
+ # Set default values defined in CO2FormData
+ for key, value in self._DEFAULTS.items():
+ setattr(self, key, kwargs.get(key, value))
+
+ def validate(self):
+ # Validate population parameters
+ self.validate_population_parameters()
+
+ # Validate specific inputs - breaks (exposed and infected)
+ if self.specific_breaks != {}:
+ if type(self.specific_breaks) is not dict:
+ raise TypeError('The specific breaks should be in a dictionary.')
+
+ dict_keys = list(self.specific_breaks.keys())
+ if "exposed_breaks" not in dict_keys:
+ raise TypeError(f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
+ if "infected_breaks" not in dict_keys:
+ raise TypeError(f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')
+
+ for population_breaks in ['exposed_breaks', 'infected_breaks']:
+ if self.specific_breaks[population_breaks] != []:
+ if type(self.specific_breaks[population_breaks]) is not list:
+ raise TypeError(f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
+ for input_break in self.specific_breaks[population_breaks]:
+ # Input validations.
+ if type(input_break) is not dict:
+ raise TypeError(f'Each break should be a dictionary. Got {type(input_break)}.')
+ dict_keys = list(input_break.keys())
+ if "start_time" not in input_break:
+ raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
+ if "finish_time" not in input_break:
+ raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
+ for time in input_break.values():
+ if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
+ raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
+
+ @classmethod
+ def find_change_points_with_pelt(self, CO2_data: dict):
+ """
+ Perform change point detection using Pelt algorithm from ruptures library with pen=15.
+ Returns a list of tuples containing (index, X-axis value) for the detected significant changes.
+ """
+
+ times: list = CO2_data['times']
+ CO2_values: list = CO2_data['CO2']
+
+ if len(times) != len(CO2_values):
+ raise ValueError("times and CO2 values must have the same length.")
+
+ # Convert the input list to a numpy array for use with the ruptures library
+ CO2_np = np.array(CO2_values)
+
+ # Define the model for change point detection (Radial Basis Function kernel)
+ model = "rbf"
+
+ # Fit the Pelt algorithm to the data with the specified model
+ algo = rpt.Pelt(model=model).fit(CO2_np)
+
+ # Predict change points using the Pelt algorithm with a penalty value of 15
+ result = algo.predict(pen=15)
+
+ # Find local minima and maxima
+ segments = np.split(np.arange(len(CO2_values)), result)
+ merged_segments = [np.hstack((segments[i], segments[i + 1])) for i in range(len(segments) - 1)]
+ result_set = set()
+ for segment in merged_segments[:-2]:
+ result_set.add(times[CO2_values.index(min(CO2_np[segment]))])
+ result_set.add(times[CO2_values.index(max(CO2_np[segment]))])
+ return list(result_set)
+
+ @classmethod
+ def generate_ventilation_plot(self, CO2_data: dict,
+ transition_times: typing.Optional[list] = None,
+ predictive_CO2: typing.Optional[list] = None):
+ times_values = CO2_data['times']
+ CO2_values = CO2_data['CO2']
+
+ fig = plt.figure(figsize=(7, 4), dpi=110)
+ plt.plot(times_values, CO2_values, label='Input CO₂')
+
+ if (transition_times):
+ for time in transition_times:
+ plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--')
+ if (predictive_CO2):
+ plt.plot(times_values, predictive_CO2, label='Predictive CO₂')
+ plt.xlabel('Time of day')
+ plt.ylabel('Concentration (ppm)')
+ plt.legend()
+ return img2base64(_figure2bytes(fig))
+
+ def population_present_changes(self, infected_presence: models.Interval, exposed_presence: models.Interval) -> typing.List[float]:
+ state_change_times = set(infected_presence.transition_times())
+ state_change_times.update(exposed_presence.transition_times())
+ return sorted(state_change_times)
+
+ def ventilation_transition_times(self) -> typing.Tuple[float, ...]:
+ # Check what type of ventilation is considered for the fitting
+ if self.fitting_ventilation_type == 'fitting_natural_ventilation':
+ vent_states = self.fitting_ventilation_states
+ vent_states.append(self.CO2_data['times'][-1])
+ return tuple(vent_states)
+ else:
+ return tuple((self.CO2_data['times'][0], self.CO2_data['times'][-1]))
+
+ def build_model(self, size=DEFAULT_MC_SAMPLE_SIZE) -> models.CO2DataModel: # type: ignore
+ # Build a simple infected and exposed population for the case when presence
+ # intervals and number of people are dynamic. Activity type is not needed.
+ infected_presence = self.infected_present_interval()
+ infected_population = models.SimplePopulation(
+ number=self.infected_people,
+ presence=infected_presence,
+ activity=None, # type: ignore
+ )
+ exposed_presence = self.exposed_present_interval()
+ exposed_population=models.SimplePopulation(
+ number=self.total_people - self.infected_people,
+ presence=exposed_presence,
+ activity=None, # type: ignore
+ )
+
+ all_state_changes=self.population_present_changes(infected_presence, exposed_presence)
+ total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
+ for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])]
+
+ return models.CO2DataModel(
+ room_volume=self.room_volume,
+ number=models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)),
+ presence=None,
+ ventilation_transition_times=self.ventilation_transition_times(),
+ times=self.CO2_data['times'],
+ CO2_concentrations=self.CO2_data['CO2'],
+ )
+
+cast_class_fields(CO2FormData)
diff --git a/caimira/apps/calculator/defaults.py b/caimira/apps/calculator/defaults.py
index 8bae7e23..82f1cf4a 100644
--- a/caimira/apps/calculator/defaults.py
+++ b/caimira/apps/calculator/defaults.py
@@ -22,6 +22,7 @@
'ceiling_height': 0.,
'conditional_probability_plot': False,
'conditional_probability_viral_loads': False,
+ 'CO2_fitting_result': '{}',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
@@ -103,8 +104,8 @@
VACCINE_TYPE = ['Ad26.COV2.S_(Janssen)', 'Any_mRNA_-_heterologous', 'AZD1222_(AstraZeneca)', 'AZD1222_(AstraZeneca)_and_any_mRNA_-_heterologous', 'AZD1222_(AstraZeneca)_and_BNT162b2_(Pfizer)',
'BBIBP-CorV_(Beijing_CNBG)', 'BNT162b2_(Pfizer)', 'BNT162b2_(Pfizer)_and_mRNA-1273_(Moderna)', 'CoronaVac_(Sinovac)', 'CoronaVac_(Sinovac)_and_AZD1222_(AstraZeneca)', 'Covishield',
'mRNA-1273_(Moderna)', 'Sputnik_V_(Gamaleya)', 'CoronaVac_(Sinovac)_and_BNT162b2_(Pfizer)']
-VENTILATION_TYPES = {'natural_ventilation',
- 'mechanical_ventilation', 'no_ventilation'}
+VENTILATION_TYPES = {'natural_ventilation', 'mechanical_ventilation',
+ 'no_ventilation', 'from_fitting'}
VIRUS_TYPES: typing.List[str] = list(config.virus_distributions)
VOLUME_TYPES = {'room_volume_explicit', 'room_volume_from_dimensions'}
WINDOWS_OPENING_REGIMES = {'windows_open_permanently',
diff --git a/caimira/apps/calculator/form_data.py b/caimira/apps/calculator/form_data.py
new file mode 100644
index 00000000..9d82af94
--- /dev/null
+++ b/caimira/apps/calculator/form_data.py
@@ -0,0 +1,440 @@
+import dataclasses
+import datetime
+import html
+import logging
+import typing
+import ast
+import json
+
+import numpy as np
+
+from caimira import models
+from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT, DEFAULT_MC_SAMPLE_SIZE
+
+LOG = logging.getLogger(__name__)
+
+minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
+
+
+@dataclasses.dataclass
+class FormData:
+ specific_breaks: dict
+ exposed_coffee_break_option: str
+ exposed_coffee_duration: int
+ exposed_finish: minutes_since_midnight
+ exposed_lunch_finish: minutes_since_midnight
+ exposed_lunch_option: bool
+ exposed_lunch_start: minutes_since_midnight
+ exposed_start: minutes_since_midnight
+ infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
+ infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
+ infected_dont_have_breaks_with_exposed: bool
+ infected_finish: minutes_since_midnight
+ infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
+ infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
+ infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
+ infected_people: int
+ infected_start: minutes_since_midnight
+ room_volume: float
+ total_people: int
+
+ _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
+
+ @classmethod
+ def from_dict(cls, form_data: typing.Dict):
+ # Take a copy of the form data so that we can mutate it.
+ form_data = form_data.copy()
+ form_data.pop('_xsrf', None)
+
+ # Don't let arbitrary unescaped HTML through the net.
+ for key, value in form_data.items():
+ if isinstance(value, str):
+ form_data[key] = html.escape(value)
+
+ for key, default_value in cls._DEFAULTS.items():
+ if form_data.get(key, '') == '':
+ if default_value is NO_DEFAULT:
+ raise ValueError(f"{key} must be specified")
+ form_data[key] = default_value
+
+ for key, value in form_data.items():
+ if key in _CAST_RULES_FORM_ARG_TO_NATIVE:
+ form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
+
+ if key not in cls._DEFAULTS:
+ raise ValueError(f'Invalid argument "{html.escape(key)}" given')
+
+ instance = cls(**form_data)
+ instance.validate()
+ return instance
+
+ @classmethod
+ def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict:
+ form_dict = {
+ field.name: getattr(form, field.name)
+ for field in dataclasses.fields(form)
+ }
+
+ for attr, value in form_dict.items():
+ if attr in _CAST_RULES_NATIVE_TO_FORM_ARG:
+ form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value)
+
+ if strip_defaults:
+ del form_dict['calculator_version']
+
+ for attr, value in list(form_dict.items()):
+ default = cls._DEFAULTS.get(attr, NO_DEFAULT)
+ if default is not NO_DEFAULT and value in [default, 'not-applicable']:
+ form_dict.pop(attr)
+ return form_dict
+
+ def validate_population_parameters(self):
+ # Validate number of infected <= number of total people
+ if self.infected_people >= self.total_people:
+ raise ValueError('Number of infected people cannot be greater or equal to the number of total people.')
+
+ # Validate time intervals selected by user
+ time_intervals = [
+ ['exposed_start', 'exposed_finish'],
+ ['infected_start', 'infected_finish'],
+ ]
+ if self.exposed_lunch_option:
+ time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
+ if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
+ time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
+
+ for start_name, end_name in time_intervals:
+ start = getattr(self, start_name)
+ end = getattr(self, end_name)
+ if start > end:
+ raise ValueError(
+ f"{start_name} must be less than {end_name}. Got {start} and {end}.")
+
+ def validate_lunch(start, finish):
+ lunch_start = getattr(self, f'{population}_lunch_start')
+ lunch_finish = getattr(self, f'{population}_lunch_finish')
+ return (start <= lunch_start <= finish and
+ start <= lunch_finish <= finish)
+
+ def get_lunch_mins(population):
+ lunch_mins = 0
+ if getattr(self, f'{population}_lunch_option'):
+ lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
+ return lunch_mins
+
+ def get_coffee_mins(population):
+ coffee_mins = 0
+ if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
+ coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
+ return coffee_mins
+
+ def get_activity_mins(population):
+ return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
+
+ populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
+ for population in populations:
+ # Validate lunch time within the activity times.
+ if (getattr(self, f'{population}_lunch_option') and
+ not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
+ ):
+ raise ValueError(
+ f"{population} lunch break must be within presence times."
+ )
+
+ # Length of breaks < length of activity
+ if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
+ raise ValueError(
+ f"Length of breaks >= Length of {population} presence."
+ )
+
+ for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
+ ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
+ if getattr(self, attr_name) not in valid_set:
+ raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
+
+ def validate(self):
+ raise NotImplementedError("Subclass must implement")
+
+ def build_model(self, sample_size=DEFAULT_MC_SAMPLE_SIZE):
+ raise NotImplementedError("Subclass must implement")
+
+ def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
+ break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
+ break_times = []
+ end = start
+ for n in range(n_breaks):
+ begin = end + break_delay
+ end = begin + duration
+ break_times.append((begin, end))
+ return tuple(break_times)
+
+ def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
+ result = []
+ if self.exposed_lunch_option:
+ result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
+ return tuple(result)
+
+ def infected_lunch_break_times(self) -> models.BoundarySequence_t:
+ if self.infected_dont_have_breaks_with_exposed:
+ result = []
+ if self.infected_lunch_option:
+ result.append((self.infected_lunch_start, self.infected_lunch_finish))
+ return tuple(result)
+ else:
+ return self.exposed_lunch_break_times()
+
+ def exposed_number_of_coffee_breaks(self) -> int:
+ return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option]
+
+ def infected_number_of_coffee_breaks(self) -> int:
+ return COFFEE_OPTIONS_INT[self.infected_coffee_break_option]
+
+ def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
+ time_before_lunch = lunch_start - activity_start
+ time_after_lunch = activity_finish - lunch_finish
+ before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
+ n_morning_breaks = round(coffee_breaks * before_lunch_frac)
+ breaks = (
+ self._compute_breaks_in_interval(
+ activity_start, lunch_start, n_morning_breaks, coffee_duration
+ )
+ + self._compute_breaks_in_interval(
+ lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration
+ )
+ )
+ return breaks
+
+ def exposed_coffee_break_times(self) -> models.BoundarySequence_t:
+ exposed_coffee_breaks = self.exposed_number_of_coffee_breaks()
+ if exposed_coffee_breaks == 0:
+ return ()
+ if self.exposed_lunch_option:
+ breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
+ else:
+ breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
+ return breaks
+
+ def infected_coffee_break_times(self) -> models.BoundarySequence_t:
+ if self.infected_dont_have_breaks_with_exposed:
+ infected_coffee_breaks = self.infected_number_of_coffee_breaks()
+ if infected_coffee_breaks == 0:
+ return ()
+ if self.infected_lunch_option:
+ breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
+ else:
+ breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
+ return breaks
+ else:
+ return self.exposed_coffee_break_times()
+
+ def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t:
+ break_times = []
+ for n in population_breaks:
+ # Parse break times.
+ begin = time_string_to_minutes(n["start_time"])
+ end = time_string_to_minutes(n["finish_time"])
+ for time in [begin, end]:
+ # For a specific break, the infected and exposed presence is the same.
+ if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'):
+ raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
+
+ break_times.append((begin, end))
+ return tuple(break_times)
+
+ def present_interval(
+ self,
+ start: int,
+ finish: int,
+ breaks: typing.Optional[models.BoundarySequence_t] = None,
+ ) -> models.Interval:
+ """
+ Calculate the presence interval given the start and end times (in minutes), and
+ a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes).
+
+ """
+ if not breaks:
+ # If there are no breaks, the interval is the start and end.
+ return models.SpecificInterval(((start/60, finish/60),))
+
+ # Order the breaks by their start-time, and ensure that they are monotonic
+ # and that the start of one break happens after the end of another.
+ break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
+
+ for break_start, break_end in break_boundaries:
+ if break_start >= break_end:
+ raise ValueError("Break ends before it begins.")
+
+ prev_break_end = break_boundaries[0][1]
+ for break_start, break_end in break_boundaries[1:]:
+ if prev_break_end >= break_start:
+ raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
+ prev_break_end = break_end
+
+ present_intervals = []
+
+ current_time = start
+ LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
+
+ # As we step through the breaks. For each break there are 6 important cases
+ # we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
+ # 1. The interval is entirely before the break. S < E <= Bs < Be
+ # 2. The interval straddles the start of the break. S < Bs < E <= Be
+ # 3. The break is entirely inside the interval. S < Bs < Be <= E
+ # 4. The interval is entirely inside the break. Bs <= S < E <= Be
+ # 5. The interval straddles the end of the break. Bs <= S < Be <= E
+ # 6. The interval is entirely after the break. Bs < Be <= S < E
+
+ for current_break in break_boundaries:
+ if current_time >= finish:
+ break
+
+ LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} "
+ f" (current time: {_hours2timestring(current_time/60)})")
+
+ break_s, break_e = current_break
+ case1 = finish <= break_s
+ case2 = current_time < break_s < finish < break_e
+ case3 = current_time < break_s < break_e <= finish
+ case4 = break_s <= current_time < finish <= break_e
+ case5 = break_s <= current_time < break_e < finish
+ case6 = break_e <= current_time
+
+ if case1:
+ LOG.debug(f"case 1: interval entirely before break")
+ present_intervals.append((current_time / 60, finish / 60))
+ LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
+ f"- {_hours2timestring(present_intervals[-1][1])}")
+ current_time = finish
+ elif case2:
+ LOG.debug(f"case 2: interval straddles start of break")
+ present_intervals.append((current_time / 60, break_s / 60))
+ LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
+ f"- {_hours2timestring(present_intervals[-1][1])}")
+ current_time = break_e
+ elif case3:
+ LOG.debug(f"case 3: break entirely inside interval")
+ # We add the bit before the break, but not the bit afterwards,
+ # as it may hit another break.
+ present_intervals.append((current_time / 60, break_s / 60))
+ LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
+ f"- {_hours2timestring(present_intervals[-1][1])}")
+ current_time = break_e
+ elif case4:
+ LOG.debug(f"case 4: interval entirely inside break")
+ current_time = finish
+ elif case5:
+ LOG.debug(f"case 5: interval straddles end of break")
+ current_time = break_e
+ elif case6:
+ LOG.debug(f"case 6: interval entirely after the break")
+
+ if current_time < finish:
+ LOG.debug("trailing interval")
+ present_intervals.append((current_time / 60, finish / 60))
+ return models.SpecificInterval(tuple(present_intervals))
+
+ def infected_present_interval(self) -> models.Interval:
+ if self.specific_breaks != {}: # It means the breaks are specific and not predefined
+ breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks'])
+ else:
+ breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
+ return self.present_interval(
+ self.infected_start, self.infected_finish,
+ breaks=breaks,
+ )
+
+ def population_present_interval(self) -> models.Interval:
+ state_change_times = set(self.infected_present_interval().transition_times())
+ state_change_times.update(self.exposed_present_interval().transition_times())
+ all_state_changes = sorted(state_change_times)
+ return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:])))
+
+ def exposed_present_interval(self) -> models.Interval:
+ if self.specific_breaks != {}: # It means the breaks are specific and not predefined
+ breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks'])
+ else:
+ breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
+ return self.present_interval(
+ self.exposed_start, self.exposed_finish,
+ breaks=breaks,
+ )
+
+
+def _hours2timestring(hours: float):
+ # Convert times like 14.5 to strings, like "14:30"
+ return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
+
+
+def time_string_to_minutes(time: str) -> minutes_since_midnight:
+ """
+ Converts time from string-format to an integer number of minutes after 00:00
+ :param time: A string of the form "HH:MM" representing a time of day
+ :return: The number of minutes between 'time' and 00:00
+ """
+ return minutes_since_midnight(60 * int(time[:2]) + int(time[3:]))
+
+
+def time_minutes_to_string(time: int) -> str:
+ """
+ Converts time from an integer number of minutes after 00:00 to string-format
+ :param time: The number of minutes between 'time' and 00:00
+ :return: A string of the form "HH:MM" representing a time of day
+ """
+ return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
+
+
+def string_to_list(s: str) -> list:
+ return list(ast.literal_eval(s.replace(""", "\"")))
+
+
+def list_to_string(l: list) -> str:
+ return json.dumps(l)
+
+
+def string_to_dict(s: str) -> dict:
+ return dict(ast.literal_eval(s.replace(""", "\"")))
+
+
+def dict_to_string(d: dict) -> str:
+ return json.dumps(d)
+
+
+def _safe_int_cast(value) -> int:
+ if isinstance(value, int):
+ return value
+ elif isinstance(value, float) and int(value) == value:
+ return int(value)
+ elif isinstance(value, str) and value.isdecimal():
+ return int(value)
+ else:
+ raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int")
+
+
+#: Mapping of field name to a callable which can convert values from form
+#: input (URL encoded arguments / string) into the correct type.
+_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
+
+#: Mapping of field name to callable which can convert native type to values
+#: that can be encoded to URL arguments.
+_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
+
+def cast_class_fields(cls):
+ for _field in dataclasses.fields(cls):
+ if _field.type is minutes_since_midnight:
+ _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes
+ _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string
+ elif _field.type is int:
+ _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast
+ elif _field.type is float:
+ _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float
+ elif _field.type is bool:
+ _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1'
+ _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int
+ elif _field.type is list:
+ _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list
+ _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string
+ elif _field.type is dict:
+ _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
+ _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
+
+cast_class_fields(FormData)
diff --git a/caimira/apps/calculator/model_generator.py b/caimira/apps/calculator/model_generator.py
index b844aa68..19419b96 100644
--- a/caimira/apps/calculator/model_generator.py
+++ b/caimira/apps/calculator/model_generator.py
@@ -1,10 +1,7 @@
import dataclasses
import datetime
-import html
import logging
import typing
-import ast
-import json
import re
import numpy as np
@@ -14,9 +11,10 @@
import caimira.data.weather
import caimira.monte_carlo as mc
from .. import calculator
+from .form_data import FormData, cast_class_fields, time_string_to_minutes
from caimira.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from caimira.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
-from .defaults import (NO_DEFAULT, DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, COFFEE_OPTIONS_INT, CONFIDENCE_LEVEL_OPTIONS,
+from .defaults import (DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VIRUS_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from caimira.store.configuration import config
@@ -27,36 +25,20 @@
@dataclasses.dataclass
-class FormData:
+class VirusFormData(FormData):
activity_type: str
air_changes: float
air_supply: float
arve_sensors_option: bool
- specific_breaks: dict
precise_activity: dict
ceiling_height: float
conditional_probability_plot: bool
conditional_probability_viral_loads: bool
- exposed_coffee_break_option: str
- exposed_coffee_duration: int
- exposed_finish: minutes_since_midnight
- exposed_lunch_finish: minutes_since_midnight
- exposed_lunch_option: bool
- exposed_lunch_start: minutes_since_midnight
- exposed_start: minutes_since_midnight
+ CO2_fitting_result: dict
floor_area: float
hepa_amount: float
hepa_option: bool
humidity: str
- infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
- infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
- infected_dont_have_breaks_with_exposed: bool
- infected_finish: minutes_since_midnight
- infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
- infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
- infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
- infected_people: int
- infected_start: minutes_since_midnight
inside_temp: float
location_name: str
location_latitude: float
@@ -73,9 +55,7 @@ class FormData:
event_month: str
room_heating_option: bool
room_number: str
- room_volume: float
simulation_name: str
- total_people: int
vaccine_option: bool
vaccine_booster_option: bool
vaccine_type: str
@@ -95,120 +75,12 @@ class FormData:
short_range_interactions: list
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
-
- @classmethod
- def from_dict(cls, form_data: typing.Dict) -> "FormData":
- # Take a copy of the form data so that we can mutate it.
- form_data = form_data.copy()
- form_data.pop('_xsrf', None)
-
- # Don't let arbitrary unescaped HTML through the net.
- for key, value in form_data.items():
- if isinstance(value, str):
- form_data[key] = html.escape(value)
-
- for key, default_value in cls._DEFAULTS.items():
- if form_data.get(key, '') == '':
- if default_value is NO_DEFAULT:
- raise ValueError(f"{key} must be specified")
- form_data[key] = default_value
-
- for key, value in form_data.items():
- if key in _CAST_RULES_FORM_ARG_TO_NATIVE:
- form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
-
- if key not in cls._DEFAULTS:
- raise ValueError(f'Invalid argument "{html.escape(key)}" given')
-
- instance = cls(**form_data)
- instance.validate()
- return instance
-
- @classmethod
- def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict:
- form_dict = {
- field.name: getattr(form, field.name)
- for field in dataclasses.fields(form)
- }
-
- for attr, value in form_dict.items():
- if attr in _CAST_RULES_NATIVE_TO_FORM_ARG:
- form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value)
-
- if strip_defaults:
- del form_dict['calculator_version']
-
- for attr, value in list(form_dict.items()):
- default = cls._DEFAULTS.get(attr, NO_DEFAULT)
- if default is not NO_DEFAULT and value in [default, 'not-applicable']:
- form_dict.pop(attr)
- return form_dict
-
+
def validate(self):
- # Validate number of infected people == 1 when activity is Conference/Training.
- if self.activity_type == 'training' and self.infected_people > 1:
- raise ValueError('Conference/Training activities are limited to 1 infected.')
- # Validate number of infected <= number of total people
- elif self.infected_people >= self.total_people:
- raise ValueError('Number of infected people cannot be more or equal than number of total people.')
-
- # Validate time intervals selected by user
- time_intervals = [
- ['exposed_start', 'exposed_finish'],
- ['infected_start', 'infected_finish'],
- ]
- if self.exposed_lunch_option:
- time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
- if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
- time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
-
- for start_name, end_name in time_intervals:
- start = getattr(self, start_name)
- end = getattr(self, end_name)
- if start > end:
- raise ValueError(
- f"{start_name} must be less than {end_name}. Got {start} and {end}.")
-
- def validate_lunch(start, finish):
- lunch_start = getattr(self, f'{population}_lunch_start')
- lunch_finish = getattr(self, f'{population}_lunch_finish')
- return (start <= lunch_start <= finish and
- start <= lunch_finish <= finish)
-
- def get_lunch_mins(population):
- lunch_mins = 0
- if getattr(self, f'{population}_lunch_option'):
- lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
- return lunch_mins
-
- def get_coffee_mins(population):
- coffee_mins = 0
- if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
- coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
- return coffee_mins
-
- def get_activity_mins(population):
- return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
-
- populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
- for population in populations:
- # Validate lunch time within the activity times.
- if (getattr(self, f'{population}_lunch_option') and
- not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
- ):
- raise ValueError(
- f"{population} lunch break must be within presence times."
- )
-
- # Length of breaks < length of activity
- if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
- raise ValueError(
- f"Length of breaks >= Length of {population} presence."
- )
+ # Validate population parameters
+ self.validate_population_parameters()
- validation_tuples = [('activity_type', ACTIVITY_TYPES),
- ('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
- ('infected_coffee_break_option', COFFEE_OPTIONS_INT),
+ validation_tuples = [('activity_type', ACTIVITY_TYPES),
('mechanical_ventilation_type', MECHANICAL_VENTILATION_TYPES),
('mask_type', MASK_TYPES),
('mask_wearing_option', MASK_WEARING_OPTIONS),
@@ -221,10 +93,16 @@ def get_activity_mins(population):
('ascertainment_bias', CONFIDENCE_LEVEL_OPTIONS),
('vaccine_type', VACCINE_TYPE),
('vaccine_booster_type', VACCINE_BOOSTER_TYPE),]
+
for attr_name, valid_set in validation_tuples:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
+
+ # Validate number of infected people == 1 when activity is Conference/Training.
+ if self.activity_type == 'training' and self.infected_people > 1:
+ raise ValueError('Conference/Training activities are limited to 1 infected.')
+ # Validate ventilation parameters
if self.ventilation_type == 'natural_ventilation':
if self.window_type == 'not-applicable':
raise ValueError(
@@ -327,7 +205,7 @@ def initialize_room(self) -> models.Room:
def build_mc_model(self) -> mc.ExposureModel:
room = self.initialize_room()
-
+ ventilation: models._VentilationBase = self.ventilation()
infected_population = self.infected_population()
short_range = []
@@ -340,11 +218,10 @@ def build_mc_model(self) -> mc.ExposureModel:
distance=short_range_distances,
))
- # Initializes and returns a model with the attributes defined above
return mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=room,
- ventilation=self.ventilation(),
+ ventilation=ventilation,
infected=infected_population,
evaporation_factor=0.3,
),
@@ -354,7 +231,7 @@ def build_mc_model(self) -> mc.ExposureModel:
geographic_population=self.geographic_population,
geographic_cases=self.geographic_cases,
ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias],
- ),
+ ),
)
def build_model(self, sample_size=DEFAULT_MC_SAMPLE_SIZE) -> models.ExposureModel:
@@ -437,10 +314,23 @@ def outside_temp(self) -> models.PiecewiseConstant:
def ventilation(self) -> models._VentilationBase:
always_on = models.PeriodicInterval(period=120, duration=120)
+ periodic_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration,
+ min(self.infected_start, self.exposed_start)/60)
+ if self.ventilation_type == 'from_fitting':
+ ventilations = []
+ if self.CO2_fitting_result['fitting_ventilation_type'] == 'fitting_natural_ventilation':
+ transition_times = self.CO2_fitting_result['transition_times']
+ for index, (start, stop) in enumerate(zip(transition_times[:-1], transition_times[1:])):
+ ventilations.append(models.AirChange(active=models.SpecificInterval(present_times=((start, stop), )),
+ air_exch=self.CO2_fitting_result['ventilation_values'][index]))
+ else:
+ ventilations.append(models.AirChange(active=always_on, air_exch=self.CO2_fitting_result['ventilation_values'][0]))
+ return models.MultipleVentilation(tuple(ventilations))
+
# Initializes a ventilation instance as a window if 'natural_ventilation' is selected, or as a HEPA-filter otherwise
if self.ventilation_type == 'natural_ventilation':
if self.window_opening_regime == 'windows_open_periodically':
- window_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration, min(self.infected_start, self.exposed_start)/60)
+ window_interval = periodic_interval
else:
window_interval = always_on
@@ -568,206 +458,11 @@ def exposed_population(self) -> mc.Population:
)
return exposed
- def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
- break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
- break_times = []
- end = start
- for n in range(n_breaks):
- begin = end + break_delay
- end = begin + duration
- break_times.append((begin, end))
- return tuple(break_times)
-
- def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
- result = []
- if self.exposed_lunch_option:
- result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
- return tuple(result)
-
- def infected_lunch_break_times(self) -> models.BoundarySequence_t:
- if self.infected_dont_have_breaks_with_exposed:
- result = []
- if self.infected_lunch_option:
- result.append((self.infected_lunch_start, self.infected_lunch_finish))
- return tuple(result)
- else:
- return self.exposed_lunch_break_times()
-
- def exposed_number_of_coffee_breaks(self) -> int:
- return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option]
-
- def infected_number_of_coffee_breaks(self) -> int:
- return COFFEE_OPTIONS_INT[self.infected_coffee_break_option]
-
- def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
- time_before_lunch = lunch_start - activity_start
- time_after_lunch = activity_finish - lunch_finish
- before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
- n_morning_breaks = round(coffee_breaks * before_lunch_frac)
- breaks = (
- self._compute_breaks_in_interval(
- activity_start, lunch_start, n_morning_breaks, coffee_duration
- )
- + self._compute_breaks_in_interval(
- lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration
- )
- )
- return breaks
-
- def exposed_coffee_break_times(self) -> models.BoundarySequence_t:
- exposed_coffee_breaks = self.exposed_number_of_coffee_breaks()
- if exposed_coffee_breaks == 0:
- return ()
- if self.exposed_lunch_option:
- breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
- else:
- breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
- return breaks
-
- def infected_coffee_break_times(self) -> models.BoundarySequence_t:
- if self.infected_dont_have_breaks_with_exposed:
- infected_coffee_breaks = self.infected_number_of_coffee_breaks()
- if infected_coffee_breaks == 0:
- return ()
- if self.infected_lunch_option:
- breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
- else:
- breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
- return breaks
- else:
- return self.exposed_coffee_break_times()
-
- def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t:
- break_times = []
- for n in population_breaks:
- # Parse break times.
- begin = time_string_to_minutes(n["start_time"])
- end = time_string_to_minutes(n["finish_time"])
- for time in [begin, end]:
- # For a specific break, the infected and exposed presence is the same.
- if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'):
- raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
-
- break_times.append((begin, end))
- return tuple(break_times)
-
- def present_interval(
- self,
- start: int,
- finish: int,
- breaks: typing.Optional[models.BoundarySequence_t] = None,
- ) -> models.Interval:
- """
- Calculate the presence interval given the start and end times (in minutes), and
- a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes).
-
- """
- if not breaks:
- # If there are no breaks, the interval is the start and end.
- return models.SpecificInterval(((start/60, finish/60),))
-
- # Order the breaks by their start-time, and ensure that they are monotonic
- # and that the start of one break happens after the end of another.
- break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
-
- for break_start, break_end in break_boundaries:
- if break_start >= break_end:
- raise ValueError("Break ends before it begins.")
-
- prev_break_end = break_boundaries[0][1]
- for break_start, break_end in break_boundaries[1:]:
- if prev_break_end >= break_start:
- raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
- prev_break_end = break_end
-
- present_intervals = []
-
- current_time = start
- LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
-
- # As we step through the breaks. For each break there are 6 important cases
- # we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
- # 1. The interval is entirely before the break. S < E <= Bs < Be
- # 2. The interval straddles the start of the break. S < Bs < E <= Be
- # 3. The break is entirely inside the interval. S < Bs < Be <= E
- # 4. The interval is entirely inside the break. Bs <= S < E <= Be
- # 5. The interval straddles the end of the break. Bs <= S < Be <= E
- # 6. The interval is entirely after the break. Bs < Be <= S < E
-
- for current_break in break_boundaries:
- if current_time >= finish:
- break
-
- LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} "
- f" (current time: {_hours2timestring(current_time/60)})")
-
- break_s, break_e = current_break
- case1 = finish <= break_s
- case2 = current_time < break_s < finish < break_e
- case3 = current_time < break_s < break_e <= finish
- case4 = break_s <= current_time < finish <= break_e
- case5 = break_s <= current_time < break_e < finish
- case6 = break_e <= current_time
-
- if case1:
- LOG.debug(f"case 1: interval entirely before break")
- present_intervals.append((current_time / 60, finish / 60))
- LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
- f"- {_hours2timestring(present_intervals[-1][1])}")
- current_time = finish
- elif case2:
- LOG.debug(f"case 2: interval straddles start of break")
- present_intervals.append((current_time / 60, break_s / 60))
- LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
- f"- {_hours2timestring(present_intervals[-1][1])}")
- current_time = break_e
- elif case3:
- LOG.debug(f"case 3: break entirely inside interval")
- # We add the bit before the break, but not the bit afterwards,
- # as it may hit another break.
- present_intervals.append((current_time / 60, break_s / 60))
- LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
- f"- {_hours2timestring(present_intervals[-1][1])}")
- current_time = break_e
- elif case4:
- LOG.debug(f"case 4: interval entirely inside break")
- current_time = finish
- elif case5:
- LOG.debug(f"case 5: interval straddles end of break")
- current_time = break_e
- elif case6:
- LOG.debug(f"case 6: interval entirely after the break")
-
- if current_time < finish:
- LOG.debug("trailing interval")
- present_intervals.append((current_time / 60, finish / 60))
- return models.SpecificInterval(tuple(present_intervals))
-
- def infected_present_interval(self) -> models.Interval:
- if self.specific_breaks != {}: # It means the breaks are specific and not predefined
- breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks'])
- else:
- breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
- return self.present_interval(
- self.infected_start, self.infected_finish,
- breaks=breaks,
- )
-
def short_range_interval(self, interaction) -> models.SpecificInterval:
start_time = time_string_to_minutes(interaction['start_time'])
duration = float(interaction['duration'])
return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),))
- def exposed_present_interval(self) -> models.Interval:
- if self.specific_breaks != {}: # It means the breaks are specific and not predefined
- breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks'])
- else:
- breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
- return self.present_interval(
- self.exposed_start, self.exposed_finish,
- breaks=breaks,
- )
-
def build_expiration(expiration_definition) -> mc._ExpirationBase:
if isinstance(expiration_definition, str):
@@ -846,80 +541,4 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'short_range_interactions': '[]',
}
-
-def _hours2timestring(hours: float):
- # Convert times like 14.5 to strings, like "14:30"
- return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
-
-
-def time_string_to_minutes(time: str) -> minutes_since_midnight:
- """
- Converts time from string-format to an integer number of minutes after 00:00
- :param time: A string of the form "HH:MM" representing a time of day
- :return: The number of minutes between 'time' and 00:00
- """
- return minutes_since_midnight(60 * int(time[:2]) + int(time[3:]))
-
-
-def time_minutes_to_string(time: int) -> str:
- """
- Converts time from an integer number of minutes after 00:00 to string-format
- :param time: The number of minutes between 'time' and 00:00
- :return: A string of the form "HH:MM" representing a time of day
- """
- return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
-
-
-def string_to_list(s: str) -> list:
- return list(ast.literal_eval(s.replace(""", "\"")))
-
-
-def list_to_string(l: list) -> str:
- return json.dumps(l)
-
-
-def string_to_dict(s: str) -> dict:
- return dict(ast.literal_eval(s.replace(""", "\"")))
-
-
-def dict_to_string(d: dict) -> str:
- return json.dumps(d)
-
-
-def _safe_int_cast(value) -> int:
- if isinstance(value, int):
- return value
- elif isinstance(value, float) and int(value) == value:
- return int(value)
- elif isinstance(value, str) and value.isdecimal():
- return int(value)
- else:
- raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int")
-
-
-#: Mapping of field name to a callable which can convert values from form
-#: input (URL encoded arguments / string) into the correct type.
-_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
-
-#: Mapping of field name to callable which can convert native type to values
-#: that can be encoded to URL arguments.
-_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
-
-
-for _field in dataclasses.fields(FormData):
- if _field.type is minutes_since_midnight:
- _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes
- _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string
- elif _field.type is int:
- _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast
- elif _field.type is float:
- _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float
- elif _field.type is bool:
- _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1'
- _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int
- elif _field.type is list:
- _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list
- _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string
- elif _field.type is dict:
- _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
- _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
+cast_class_fields(VirusFormData)
diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py
index 12c2eddc..e3757229 100644
--- a/caimira/apps/calculator/report_generator.py
+++ b/caimira/apps/calculator/report_generator.py
@@ -15,7 +15,7 @@
from caimira import models
from caimira.apps.calculator import markdown_tools
from ... import monte_carlo as mc
-from .model_generator import FormData, DEFAULT_MC_SAMPLE_SIZE
+from .model_generator import VirusFormData, DEFAULT_MC_SAMPLE_SIZE
from ... import dataclass_utils
from caimira.store.configuration import config
@@ -102,7 +102,7 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional
return nice_times
-def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
+def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
lower_concentrations = []
for time in times:
for index, (start, stop) in enumerate(short_range_intervals):
@@ -114,7 +114,7 @@ def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel
return lower_concentrations
-def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
+def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
@@ -150,6 +150,7 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing
zip(('viral_loads', 'pi_means', 'lower_percentiles', 'upper_percentiles'),
manufacture_conditional_probability_data(model, prob))}
+
return {
"model_repr": repr(model),
"times": list(times),
@@ -174,8 +175,8 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing
}
-def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: FormData):
- form_dict = FormData.to_dict(form, strip_defaults=True)
+def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
+ form_dict = VirusFormData.to_dict(form, strip_defaults=True)
# Generate the calculator URL arguments that would be needed to re-create this
# form.
@@ -318,6 +319,13 @@ def readable_minutes(minutes: int) -> str:
return time_str + unit
+def hour_format(hour: float) -> str:
+ # Convert float hour to HH:MM format
+ hours = int(hour)
+ minutes = int(hour % 1 * 60)
+ return f"{hours}:{minutes if minutes != 0 else '00'}"
+
+
def percentage(absolute: float) -> float:
return absolute * 100
@@ -345,7 +353,7 @@ def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typ
return scenarios
-def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.ExposureModel]:
+def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
@@ -416,7 +424,7 @@ def scenario_statistics(mc_model: mc.ExposureModel, sample_times: typing.List[fl
def comparison_report(
- form: FormData,
+ form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
@@ -464,7 +472,7 @@ class ReportGenerator:
def build_report(
self,
base_url: str,
- form: FormData,
+ form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
@@ -475,7 +483,7 @@ def prepare_context(
self,
base_url: str,
model: models.ExposureModel,
- form: FormData,
+ form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> dict:
now = datetime.utcnow().astimezone()
@@ -513,6 +521,7 @@ def _template_environment(self) -> jinja2.Environment:
env.filters['non_zero_percentage'] = non_zero_percentage
env.filters['readable_minutes'] = readable_minutes
env.filters['minutes_to_time'] = minutes_to_time
+ env.filters['hour_format'] = hour_format
env.filters['float_format'] = "{0:.2f}".format
env.filters['int_format'] = "{:0.0f}".format
env.filters['percentage'] = percentage
diff --git a/caimira/apps/calculator/static/js/co2_form.js b/caimira/apps/calculator/static/js/co2_form.js
new file mode 100644
index 00000000..ef21f010
--- /dev/null
+++ b/caimira/apps/calculator/static/js/co2_form.js
@@ -0,0 +1,388 @@
+// Input data for CO2 fitting algorithm
+const CO2_data_form = [
+ "CO2_data",
+ "exposed_coffee_break_option",
+ "exposed_coffee_duration",
+ "exposed_finish",
+ "exposed_lunch_finish",
+ "exposed_lunch_option",
+ "exposed_lunch_start",
+ "exposed_start",
+ "fitting_ventilation_states",
+ "fitting_ventilation_type",
+ "infected_coffee_break_option",
+ "infected_coffee_duration",
+ "infected_dont_have_breaks_with_exposed",
+ "infected_finish",
+ "infected_lunch_finish",
+ "infected_lunch_option",
+ "infected_lunch_start",
+ "infected_people",
+ "infected_start",
+ "room_volume",
+ "specific_breaks",
+ "total_people",
+];
+
+// Method to upload a valid excel file
+function uploadFile(endpoint) {
+ clearFittingResultComponent();
+ const files = $("#file_upload")[0].files;
+ if (files.length === 0) {
+ $("#upload-error")
+ .text('Please choose a file.')
+ .show();
+ return;
+ }
+ const file = files[0];
+ const extension = file.name
+ .substring(file.name.lastIndexOf("."))
+ .toUpperCase();
+ if (extension !== ".XLS" && extension !== ".XLSX") {
+ $("#upload-error")
+ .text("Please select a valid excel file (.XLS or .XLSX).")
+ .show();
+ return;
+ }
+
+ // FileReader API to read the Excel file
+ const reader = new FileReader();
+ reader.onload = function (event) {
+ const fileContent = event.target.result;
+ const workbook = XLSX.read(fileContent, { type: "binary" });
+
+ // Assuming the first sheet is the one we want to validate
+ const firstSheetName = workbook.SheetNames[0];
+ const worksheet = workbook.Sheets[firstSheetName];
+
+ // Check if the headers match the expected format
+ const headerCoordinates = {
+ Times: "A1",
+ CO2: "B1",
+ };
+ for (const header in headerCoordinates) {
+ const cellValue = worksheet[headerCoordinates[header]]?.v;
+ if (
+ !cellValue ||
+ $.type(cellValue) !== "string" ||
+ cellValue.trim().toLowerCase() !== header.toLowerCase()
+ ) {
+ $("#upload-error")
+ .text(`The file does not have the expected header "${header}".`)
+ .show();
+ return;
+ }
+ }
+
+ const data = XLSX.utils.sheet_to_json(worksheet, { header: 1, raw: false });
+ // Check if there is any data below the header row
+ if (data.length <= 1) {
+ $("#upload-error")
+ .text(
+ "The Excel file is empty. Please make sure it contains data below the header row."
+ )
+ .show();
+ return;
+ }
+
+ // Validate data in the columns
+ const timesColumnIndex = 0;
+ const CO2ColumnIndex = 1;
+ for (let i = 1; i < data.length; i++) {
+ try {
+ const timesCellValue = parseFloat(data[i][timesColumnIndex]);
+ const CO2CellValue = parseFloat(data[i][CO2ColumnIndex]);
+
+ if (isNaN(timesCellValue) || isNaN(CO2CellValue)) {
+ throw new Error("Invalid data in the Times or CO2 columns.");
+ }
+ } catch (error) {
+ $("#upload-error")
+ .text(
+ "Invalid data in the Times or CO2 columns. Please make sure they contain only float values."
+ )
+ .show();
+ return;
+ }
+ }
+
+ // Convert Excel file to JSON and further processing
+ try {
+ generateJSONStructure(endpoint, data);
+ // If all validations pass, process the file here or display a success message
+ $("#upload-file-extention-error").hide();
+ } catch (error) {
+ console.log(error);
+ }
+ };
+ reader.readAsBinaryString(file); // Read the file as a binary string
+}
+
+// Method to generate the JSON structure
+function generateJSONStructure(endpoint, jsonData) {
+ const inputToPopulate = $("#CO2_data");
+
+ // Initialize the final structure
+ const finalStructure = { times: [], CO2: [] };
+
+ if (jsonData.length > 0) {
+ // Loop through the input dataArray and extract the values starting from the second array (index 1)
+ for (let i = 1; i < jsonData.length; i++) {
+ const arr = jsonData[i];
+ // Assuming arr contains two float values
+ finalStructure.times.push(parseFloat(arr[0]));
+ finalStructure.CO2.push(parseFloat(arr[1]));
+ }
+ inputToPopulate.val(JSON.stringify(finalStructure));
+ $("#generate_fitting_data").prop("disabled", false);
+ $("#fitting_ventilation_states").prop("disabled", false);
+ $("[name=fitting_ventilation_type]").prop("disabled", false);
+ plotCO2Data(endpoint);
+ }
+}
+
+function insertErrorFor(referenceNode, text) {
+ $(`${text}`).insertAfter(referenceNode)
+}
+
+function validateFormInputs(obj) {
+ $("#ventilation_data").find("span.error_text").remove(); // Remove all error spans
+
+ let submit = true;
+ const $referenceNode = $("#DIVCO2_data_dialog");
+ for (let i = 0; i < CO2_data_form.length; i++) {
+ const $requiredElement = $(`[name=${CO2_data_form[i]}]`).first();
+ if ($requiredElement.attr('name') !== "fitting_ventilation_states" && $requiredElement.val() === "") {
+ insertErrorFor(
+ $referenceNode,
+ `'${$requiredElement.attr('name')}' must be defined.
`
+ );
+ submit = false;
+ }
+ }
+ if (submit) {
+ $($(obj).data("target")).modal("show");
+ $("#upload-error").hide();
+ $("#upload-file-extention-error").hide();
+ }
+ return submit;
+}
+
+function validateCO2Form() {
+ let submit = true;
+ if (validateFormInputs($("#button_fit_data"))) submit = true;
+
+ const $fittingToSubmit = $('#DIVCO2_fitting_to_submit');
+ // Check if natural ventilation is selected
+ if (
+ $fittingToSubmit.find('input[name="fitting_ventilation_type"]:checked').val() ==
+ "fitting_natural_ventilation"
+ ) {
+ // Validate ventilation scheme
+ const $ventilationStates = $fittingToSubmit.find("input[name=fitting_ventilation_states]");
+ const $referenceNode = $("#DIVCO2_fitting_result");
+ if ($ventilationStates.val() !== "") {
+ // validate input format
+ try {
+ const parsedValue = JSON.parse($ventilationStates.val());
+ if (Array.isArray(parsedValue)) {
+ if (parsedValue.length <= 1) {
+ insertErrorFor(
+ $referenceNode,
+ `'${$ventilationStates.attr('name')}' must have more than one $ventilationStates.
`
+ );
+ submit = false;
+ }
+ else {
+ const infected_finish = $(`[name=infected_finish]`).first().val();
+ const exposed_finish = $(`[name=exposed_finish]`).first().val();
+
+ const [hours_infected, minutes_infected] = infected_finish.split(":").map(Number);
+ const elapsed_time_infected = hours_infected * 60 + minutes_infected;
+
+ const [hours_exposed, minutes_exposed] = exposed_finish.split(":").map(Number);
+ const elapsed_time_exposed = hours_exposed * 60 + minutes_exposed;
+
+ const max_presence_time = Math.max(elapsed_time_infected, elapsed_time_exposed);
+ const max_transition_time = parsedValue[parsedValue.length - 1] * 60;
+
+ if (max_transition_time > max_presence_time) {
+ insertErrorFor(
+ $referenceNode,
+ `The last transition time (${parsedValue[parsedValue.length - 1]}) should be before the last presence time (${max_presence_time / 60}).
`
+ );
+ submit = false;
+ }
+ }
+ }
+ else {
+ insertErrorFor(
+ $referenceNode,
+ `'${$ventilationStates.attr('name')}' must be a list.`
+ );
+ submit = false;
+ }
+ } catch {
+ insertErrorFor(
+ $referenceNode,
+ `'${$ventilationStates.attr('name')}' must be a list of numbers.`
+ );
+ submit = false;
+ }
+ } else {
+ insertErrorFor(
+ $referenceNode,
+ `'${$ventilationStates.attr('name')}' must be defined.`
+ );
+ submit = false;
+ }
+ }
+
+ return submit;
+}
+
+function displayTransitionTimesHourFormat(start, stop) {
+ var minutes_start = ((start % 1) * 60).toPrecision(2);
+ var minutes_stop = ((stop % 1) * 60).toPrecision(2);
+ return (
+ Math.floor(start) +
+ ":" +
+ (minutes_start != "0.0" ? minutes_start : "00") +
+ " - " +
+ Math.floor(stop) +
+ ":" +
+ (minutes_stop != "0.0" ? minutes_stop : "00")
+ );
+}
+
+function displayFittingData(json_response) {
+ $("#DIVCO2_fitting_result").show();
+ $("#CO2_data_plot").attr("src", json_response["CO2_plot"]);
+ // Not needed for the form submission
+ delete json_response["CO2_plot"];
+ delete json_response["predictive_CO2"];
+ $("#CO2_fitting_result").val(JSON.stringify(json_response));
+ $("#exhalation_rate_fit").html(
+ "Exhalation rate: " +
+ String(json_response["exhalation_rate"].toFixed(2)) +
+ " m³/h"
+ );
+ let ventilation_table =
+ "