Skip to content

Commit

Permalink
Merge branch 'feature/CO2_fit' into 'master'
Browse files Browse the repository at this point in the history
Fit ventilation and exhalation rates on CO2 sensor data

See merge request caimira/caimira!444
  • Loading branch information
lrdossan committed Nov 22, 2023
2 parents 8fddfc3 + f6abdc9 commit d70029d
Show file tree
Hide file tree
Showing 19 changed files with 1,549 additions and 575 deletions.
57 changes: 52 additions & 5 deletions caimira/apps/calculator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import tornado.log

from . import markdown_tools
from . import model_generator
from . import model_generator, co2_model_generator
from .report_generator import ReportGenerator, calculate_report_data
from .user import AuthenticatedUser, AnonymousUser

Expand All @@ -37,7 +37,7 @@
# calculator version. If the calculator needs to make breaking changes (e.g. change
# form attributes) then it can also increase its MAJOR version without needing to
# increase the overall CAiMIRA version (found at ``caimira.__version__``).
__version__ = "4.13.0"
__version__ = "4.14.0"

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -106,7 +106,7 @@ async def post(self) -> None:
start = datetime.datetime.now()

try:
form = model_generator.FormData.from_dict(requested_model_config)
form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
Expand Down Expand Up @@ -157,7 +157,7 @@ async def post(self) -> None:
pprint(requested_model_config)

try:
form = model_generator.FormData.from_dict(requested_model_config)
form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
Expand All @@ -178,7 +178,7 @@ async def post(self) -> None:

class StaticModel(BaseRequestHandler):
async def get(self) -> None:
form = model_generator.FormData.from_dict(model_generator.baseline_raw_form_data())
form = model_generator.VirusFormData.from_dict(model_generator.baseline_raw_form_data())
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size'])
Expand Down Expand Up @@ -340,7 +340,53 @@ def get(self):
active_page=self.active_page,
text_blocks=template_environment.globals["common_text"]
))


class CO2ModelResponse(BaseRequestHandler):
def check_xsrf_cookie(self):
"""
This request handler implements a stateless API that returns report data in JSON format.
Thus, XSRF cookies are disabled by overriding base class implementation of this method with a pass statement.
"""
pass

async def post(self, endpoint: str) -> None:
requested_model_config = tornado.escape.json_decode(self.request.body)
try:
form = co2_model_generator.CO2FormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
print(traceback.format_exc())
response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
self.set_status(400)
self.finish(json.dumps(response_json))
return

if endpoint.rstrip('/') == 'plot':
transition_times = co2_model_generator.CO2FormData.find_change_points_with_pelt(form.CO2_data)
self.finish({'CO2_plot': co2_model_generator.CO2FormData.generate_ventilation_plot(form.CO2_data, transition_times),
'transition_times': [round(el, 2) for el in transition_times]})
else:
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
report_task = executor.submit(
co2_model_generator.CO2FormData.build_model, form,
)
report = await asyncio.wrap_future(report_task)

result = dict(report.CO2_fit_params())
ventilation_transition_times = report.ventilation_transition_times

result['fitting_ventilation_type'] = form.fitting_ventilation_type
result['transition_times'] = ventilation_transition_times
result['CO2_plot'] = co2_model_generator.CO2FormData.generate_ventilation_plot(CO2_data=form.CO2_data,
transition_times=ventilation_transition_times[:-1],
predictive_CO2=result['predictive_CO2'])
self.finish(result)


def get_url(app_root: str, relative_path: str = '/'):
return app_root.rstrip('/') + relative_path.rstrip('/')
Expand All @@ -363,6 +409,7 @@ def make_app(
base_urls: typing.List = [
(get_root_url(r'/?'), LandingPage),
(get_root_calculator_url(r'/?'), CalculatorForm),
(get_root_calculator_url(r'/co2-fit/(.*)'), CO2ModelResponse),
(get_root_calculator_url(r'/report'), ConcentrationModel),
(get_root_url(r'/static/(.*)'), StaticFileHandler, {'path': static_dir}),
(get_root_calculator_url(r'/static/(.*)'), StaticFileHandler, {'path': calculator_static_dir}),
Expand Down
186 changes: 186 additions & 0 deletions caimira/apps/calculator/co2_model_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import dataclasses
import logging
import typing
import numpy as np
import ruptures as rpt
import matplotlib.pyplot as plt
import re

from caimira import models
from .form_data import FormData, cast_class_fields
from .defaults import DEFAULT_MC_SAMPLE_SIZE, NO_DEFAULT
from .report_generator import img2base64, _figure2bytes

minutes_since_midnight = typing.NewType('minutes_since_midnight', int)

LOG = logging.getLogger(__name__)


@dataclasses.dataclass
class CO2FormData(FormData):
CO2_data: dict
fitting_ventilation_states: list
fitting_ventilation_type: str

#: The default values for undefined fields. Note that the defaults here
#: and the defaults in the html form must not be contradictory.
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = {
'CO2_data': '{}',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
'exposed_lunch_finish': '13:30',
'exposed_lunch_option': True,
'exposed_lunch_start': '12:30',
'exposed_start': '08:30',
'fitting_ventilation_states': '[]',
'fitting_ventilation_type': 'fitting_natural_ventilation',
'infected_coffee_break_option': 'coffee_break_0',
'infected_coffee_duration': 5,
'infected_dont_have_breaks_with_exposed': False,
'infected_finish': '17:30',
'infected_lunch_finish': '13:30',
'infected_lunch_option': True,
'infected_lunch_start': '12:30',
'infected_people': 1,
'infected_start': '08:30',
'room_volume': NO_DEFAULT,
'specific_breaks': '{}',
'total_people': NO_DEFAULT,
}

def __init__(self, **kwargs):
# Set default values defined in CO2FormData
for key, value in self._DEFAULTS.items():
setattr(self, key, kwargs.get(key, value))

def validate(self):
# Validate population parameters
self.validate_population_parameters()

# Validate specific inputs - breaks (exposed and infected)
if self.specific_breaks != {}:
if type(self.specific_breaks) is not dict:
raise TypeError('The specific breaks should be in a dictionary.')

dict_keys = list(self.specific_breaks.keys())
if "exposed_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
if "infected_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')

for population_breaks in ['exposed_breaks', 'infected_breaks']:
if self.specific_breaks[population_breaks] != []:
if type(self.specific_breaks[population_breaks]) is not list:
raise TypeError(f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
for input_break in self.specific_breaks[population_breaks]:
# Input validations.
if type(input_break) is not dict:
raise TypeError(f'Each break should be a dictionary. Got {type(input_break)}.')
dict_keys = list(input_break.keys())
if "start_time" not in input_break:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
if "finish_time" not in input_break:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
for time in input_break.values():
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')

@classmethod
def find_change_points_with_pelt(self, CO2_data: dict):
"""
Perform change point detection using Pelt algorithm from ruptures library with pen=15.
Returns a list of tuples containing (index, X-axis value) for the detected significant changes.
"""

times: list = CO2_data['times']
CO2_values: list = CO2_data['CO2']

if len(times) != len(CO2_values):
raise ValueError("times and CO2 values must have the same length.")

# Convert the input list to a numpy array for use with the ruptures library
CO2_np = np.array(CO2_values)

# Define the model for change point detection (Radial Basis Function kernel)
model = "rbf"

# Fit the Pelt algorithm to the data with the specified model
algo = rpt.Pelt(model=model).fit(CO2_np)

# Predict change points using the Pelt algorithm with a penalty value of 15
result = algo.predict(pen=15)

# Find local minima and maxima
segments = np.split(np.arange(len(CO2_values)), result)
merged_segments = [np.hstack((segments[i], segments[i + 1])) for i in range(len(segments) - 1)]
result_set = set()
for segment in merged_segments[:-2]:
result_set.add(times[CO2_values.index(min(CO2_np[segment]))])
result_set.add(times[CO2_values.index(max(CO2_np[segment]))])
return list(result_set)

@classmethod
def generate_ventilation_plot(self, CO2_data: dict,
transition_times: typing.Optional[list] = None,
predictive_CO2: typing.Optional[list] = None):
times_values = CO2_data['times']
CO2_values = CO2_data['CO2']

fig = plt.figure(figsize=(7, 4), dpi=110)
plt.plot(times_values, CO2_values, label='Input CO₂')

if (transition_times):
for time in transition_times:
plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--')
if (predictive_CO2):
plt.plot(times_values, predictive_CO2, label='Predictive CO₂')
plt.xlabel('Time of day')
plt.ylabel('Concentration (ppm)')
plt.legend()
return img2base64(_figure2bytes(fig))

def population_present_changes(self, infected_presence: models.Interval, exposed_presence: models.Interval) -> typing.List[float]:
state_change_times = set(infected_presence.transition_times())
state_change_times.update(exposed_presence.transition_times())
return sorted(state_change_times)

def ventilation_transition_times(self) -> typing.Tuple[float, ...]:
# Check what type of ventilation is considered for the fitting
if self.fitting_ventilation_type == 'fitting_natural_ventilation':
vent_states = self.fitting_ventilation_states
vent_states.append(self.CO2_data['times'][-1])
return tuple(vent_states)
else:
return tuple((self.CO2_data['times'][0], self.CO2_data['times'][-1]))

def build_model(self, size=DEFAULT_MC_SAMPLE_SIZE) -> models.CO2DataModel: # type: ignore
# Build a simple infected and exposed population for the case when presence
# intervals and number of people are dynamic. Activity type is not needed.
infected_presence = self.infected_present_interval()
infected_population = models.SimplePopulation(
number=self.infected_people,
presence=infected_presence,
activity=None, # type: ignore
)
exposed_presence = self.exposed_present_interval()
exposed_population=models.SimplePopulation(
number=self.total_people - self.infected_people,
presence=exposed_presence,
activity=None, # type: ignore
)

all_state_changes=self.population_present_changes(infected_presence, exposed_presence)
total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])]

return models.CO2DataModel(
room_volume=self.room_volume,
number=models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)),
presence=None,
ventilation_transition_times=self.ventilation_transition_times(),
times=self.CO2_data['times'],
CO2_concentrations=self.CO2_data['CO2'],
)

cast_class_fields(CO2FormData)
5 changes: 3 additions & 2 deletions caimira/apps/calculator/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
'ceiling_height': 0.,
'conditional_probability_plot': False,
'conditional_probability_viral_loads': False,
'CO2_fitting_result': '{}',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
Expand Down Expand Up @@ -103,8 +104,8 @@
VACCINE_TYPE = ['Ad26.COV2.S_(Janssen)', 'Any_mRNA_-_heterologous', 'AZD1222_(AstraZeneca)', 'AZD1222_(AstraZeneca)_and_any_mRNA_-_heterologous', 'AZD1222_(AstraZeneca)_and_BNT162b2_(Pfizer)',
'BBIBP-CorV_(Beijing_CNBG)', 'BNT162b2_(Pfizer)', 'BNT162b2_(Pfizer)_and_mRNA-1273_(Moderna)', 'CoronaVac_(Sinovac)', 'CoronaVac_(Sinovac)_and_AZD1222_(AstraZeneca)', 'Covishield',
'mRNA-1273_(Moderna)', 'Sputnik_V_(Gamaleya)', 'CoronaVac_(Sinovac)_and_BNT162b2_(Pfizer)']
VENTILATION_TYPES = {'natural_ventilation',
'mechanical_ventilation', 'no_ventilation'}
VENTILATION_TYPES = {'natural_ventilation', 'mechanical_ventilation',
'no_ventilation', 'from_fitting'}
VIRUS_TYPES: typing.List[str] = list(config.virus_distributions)
VOLUME_TYPES = {'room_volume_explicit', 'room_volume_from_dimensions'}
WINDOWS_OPENING_REGIMES = {'windows_open_permanently',
Expand Down
Loading

0 comments on commit d70029d

Please sign in to comment.