Source code for calisim.abc.pyabc_wrapper

"""Contains the implementations for Approximate Bayesian Computation methods using
PyABC

Implements the supported Approximate Bayesian Computation methods using
the PyABC library.

"""

from collections.abc import Callable
from datetime import timedelta

import numpy as np
import pandas as pd
import pyabc
from matplotlib import pyplot as plt

from ..base import CalibrationWorkflowBase
from ..data_model import ParameterDataType, ParameterEstimateModel


[docs] class PyABCApproximateBayesianComputation(CalibrationWorkflowBase): """The PyABC Approximate Bayesian Computation method class."""
[docs] def dist_name_processing(self, name: str) -> str: """Apply data preprocessing to the distribution name. Args: name (str): The unprocessed distribution name. Returns: str: The processed distribution name. """ name = name.replace(" ", "_").lower() if name == "normal": name = "norm" return name
[docs] def specify(self) -> None: """Specify the parameters of the model calibration procedure.""" distributions = {} transition_mapping = {} parameter_spec = self.specification.parameter_spec.parameters for spec in parameter_spec: parameter_name = spec.name data_type = spec.data_type if ( data_type == ParameterDataType.DISCRETE or data_type == ParameterDataType.CATEGORICAL ): if data_type == ParameterDataType.DISCRETE: lower_bound, upper_bound = self.get_parameter_bounds(spec) else: lower_bound, upper_bound = self.set_categorical_parameter(spec) lower_bound = np.floor(lower_bound).astype("int") upper_bound = np.floor(upper_bound).astype("int") discrete_domain = np.arange(lower_bound, upper_bound) distributions[parameter_name] = pyabc.RV( "rv_discrete", values=( discrete_domain, np.repeat(1 / len(discrete_domain), len(discrete_domain)), ), ) transition_mapping[parameter_name] = pyabc.DiscreteJumpTransition( domain=discrete_domain, p_stay=0.7 ) elif data_type == ParameterDataType.CONSTANT: parameter_value = spec.parameter_value self.constants[parameter_name] = parameter_value else: distribution_name = self.dist_name_processing(spec.distribution_name) distribution_args = spec.distribution_args if distribution_args is None: distribution_args = [] distribution_kwargs = spec.distribution_kwargs if distribution_kwargs is None: distribution_kwargs = {} distributions[parameter_name] = pyabc.RV( distribution_name, *distribution_args, **distribution_kwargs ) transition_mapping[parameter_name] = pyabc.MultivariateNormalTransition( scaling=1 ) self.parameters = pyabc.Distribution(**distributions) self.transitions = pyabc.AggregatedTransition(mapping=transition_mapping)
[docs] def execute(self) -> None: """Execute the simulation calibration procedure.""" adaptive_pop_size = pyabc.AdaptivePopulationSize( self.specification.n_init, self.specification.min_population_size, self.specification.n_bootstrap, ) output_labels = self.specification.output_labels if output_labels is None: output_labels = ["output"] abc_kwargs = self.get_calibration_func_kwargs() def simulator_func(parameters: dict) -> dict: observed_data = self.specification.observed_data simulation_id = self.get_simulation_uuid() parameters = parameters.copy() parameters = self.assign_categorical_parameter_values(parameters) for k, v in self.constants.items(): parameters[k] = v results = self.call_calibration_func( parameters, simulation_id, observed_data, **abc_kwargs ) summary_stats = {} if len(output_labels) == 1: summary_stats[output_labels[0]] = results else: for i, output_label in enumerate(output_labels): summary_stats[output_label] = results[i] # type: ignore[index] return summary_stats n_jobs = self.specification.n_jobs if n_jobs > 1: sampler = pyabc.MulticoreEvalParallelSampler(n_procs=n_jobs) else: sampler = pyabc.SingleCoreSampler(check_max_eval=True) def _make_distance( output_label: str, ) -> Callable[[dict[str, float], dict[str, float]], float]: return lambda simulated, _: simulated[output_label] distance_func = pyabc.AggregatedDistance( [_make_distance(label) for label in output_labels] ) self.abc = pyabc.ABCSMC( simulator_func, self.parameters, distance_func, population_size=adaptive_pop_size, transitions=self.transitions, eps=pyabc.MedianEpsilon(), sampler=sampler, ) storage = self.specification.storage if storage is None: storage = "sqlite://" self.abc.new(storage) method_kwargs = self.specification.method_kwargs if method_kwargs is None: method_kwargs = {} self.history = self.abc.run( minimum_epsilon=self.specification.epsilon, max_walltime=timedelta(minutes=self.specification.walltime), **method_kwargs, )
[docs] def analyze(self) -> None: """Analyze the results of the simulation calibration procedure.""" task, time_now, experiment_name, outdir = self.prepare_analyze() for plot_func in [ pyabc.visualization.plot_sample_numbers, pyabc.visualization.plot_total_sample_numbers, pyabc.visualization.plot_sample_numbers_trajectory, pyabc.visualization.plot_epsilons, pyabc.visualization.plot_effective_sample_sizes, pyabc.visualization.plot_walltime, pyabc.visualization.plot_total_walltime, pyabc.visualization.plot_contour_matrix, pyabc.visualization.plot_acceptance_rates_trajectory, pyabc.visualization.plot_kde_matrix_highlevel, ]: plot_func(self.history) plot_name = plot_func.__name__.replace("_", "-") if outdir is not None: outfile = self.join( outdir, f"{time_now}-{task}-{experiment_name}-{plot_name}.png", ) self.append_artifact(outfile) plt.tight_layout() plt.savefig(outfile) plt.close() else: plt.show() plt.close() distribution_dfs = [] for t in range(self.history.max_t + 1): df, w = self.history.get_distribution(m=0, t=t) df["t"] = t df["w"] = w distribution_dfs.append(df) distribution_dfs = pd.concat(distribution_dfs) max_t = distribution_dfs["t"].max() trace_df = distribution_dfs.query(f"t == {max_t}") trace_df = trace_df.drop(columns=["t", "w"]) for name in trace_df: estimate = trace_df[name].mean() uncertainty = trace_df[name].std() parameter_estimate = ParameterEstimateModel( name=name, estimate=estimate, uncertainty=uncertainty ) self.add_parameter_estimate(parameter_estimate) if outdir is None: return self.to_csv(distribution_dfs, "parameters") populations_df = self.history.get_all_populations() self.to_csv(populations_df, "populations") population_particles_df = self.history.get_nr_particles_per_population() self.to_csv(population_particles_df, "particles-per-population") distances_df = [] for t in range(self.history.max_t + 1): df = self.history.get_weighted_distances(t=t) df["t"] = t distances_df.append(df) distances_df = pd.concat(distances_df) self.to_csv(distances_df, "distances")