"""Contains utility functions for calibration
This module defines various utility functions
for the calibration of simulations.
"""
import os.path as osp
import uuid
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
from ..data_model import ParameterDataType
[docs]
def get_datetime_now() -> str:
"""Get the current datetime for now.
Returns:
str: The current datetime.
"""
return datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
[docs]
def get_simulation_uuid() -> str:
"""Get a new simulation uuid.
Returns:
str: The simulation uuid.
"""
simulation_uuid = str(uuid.uuid4())
return simulation_uuid
[docs]
def get_examples_outdir() -> str: # pragma: no cover
"""Get the output directory for calibration examples.
Returns:
str: The output directory.
"""
return osp.join("examples", "outdir")
[docs]
def create_file_path(file_path: str) -> str:
"""Create file path if it does not exist.
Args:
file_path (str): The file path to create.
Returns:
str: The created file path.
"""
path = Path(file_path)
if not path.is_dir():
path.mkdir(parents=True, exist_ok=True)
return file_path
[docs]
def calibration_func_wrapper(
X: np.ndarray,
workflow: "CalibrationWorkflowBase", # type: ignore[name-defined] # noqa: F821
observed_data: pd.DataFrame | np.ndarray,
parameter_names: list[str],
data_types: list[ParameterDataType],
calibration_kwargs: dict,
wrap_values: bool = False,
) -> np.ndarray:
"""Wrapper function for the calibration function.
Args:
X (np.ndarray): The parameter set matrix.
workflow (CalibrationWorkflowBase): The calibration workflow.
observed_data (pd.DataFrame | np.ndarray): The observed data.
parameter_names (list[str]): The list of simulation parameter names.
data_types (list[ParameterDataType]): The data types for each parameter.
calibration_kwargs (dict): Arguments to supply to the calibration function.
wrap_values (bool): Whether to wrap scalar values with a list.
Defaults to False.
Returns:
np.ndarray: The simulation output data.
"""
import numpy as np
constants = workflow.get_constants()
parameters = []
for theta in X:
parameter_set = {}
for i, parameter_value in enumerate(theta):
parameter_name = parameter_names[i]
data_type = data_types[i]
if data_type == ParameterDataType.CONTINUOUS:
parameter_set[parameter_name] = parameter_value
else:
parameter_set[parameter_name] = int(parameter_value)
parameter_set = workflow.assign_categorical_parameter_values(parameter_set)
for k, v in constants.items():
parameter_set[k] = v
parameters.append(parameter_set)
simulation_ids = [get_simulation_uuid() for _ in range(len(parameters))]
workflow.simulation_ids = simulation_ids
if workflow.specification.batched:
results = workflow.call_calibration_func(
parameters, simulation_ids, observed_data, **calibration_kwargs
)
else:
results = []
for i, parameter in enumerate(parameters):
simulation_id = simulation_ids[i]
result = workflow.call_calibration_func(
parameter,
simulation_id,
observed_data,
**calibration_kwargs,
)
if wrap_values and not isinstance(result, list):
result = [result]
results.append(result)
results = np.array(results)
return results
[docs]
class EarlyStopper:
"""Early stopping for training."""
def __init__(self, patience: int = 1, min_delta: float = 0):
"""EarlyStopper constructor.
Args:
patience (int, optional): The number of iterations
before performing early stopping. Defaults to 1.
min_delta (float, optional): The minimum difference
for the validation loss. Defaults to 0.
"""
self.patience = patience
self.min_delta = min_delta
self.counter: int = 0
self.min_validation_loss = float("inf")
[docs]
def early_stop(self, validation_loss: float) -> bool:
"""Perform early stopping.
Args:
validation_loss (float): The training validation loss.
Returns:
bool: Whether to perform early stopping.
"""
if validation_loss < self.min_validation_loss:
self.min_validation_loss = validation_loss
self.counter = 0
elif validation_loss >= (self.min_validation_loss + self.min_delta):
self.counter += 1
if self.counter >= self.patience:
return True
return False
[docs]
def extend_X(X: np.ndarray, Y_rows: int) -> np.ndarray:
"""Extend the number of rows for X with a dummy index column.
Args:
X (np.ndarray): The input matrix.
Y_rows (int) The number of rows for the simulation outputs.
Returns:
np.ndarray: The extended input matrix with a dummy column.
"""
design_list = []
for i in range(X.shape[0]):
for j in range(Y_rows):
row = np.append(X[i], j)
design_list.append(row)
X = np.array(design_list)
return X