"""Contains the implementations for evolutionary algorithms using EvoTorch
Implements the supported evolutionary algorithms using the EvoTorch library.
"""
from typing import Any
import evotorch.algorithms as algos
import evotorch.operators as operators
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import torch
from evotorch import Problem
from evotorch.logging import PandasLogger
from matplotlib import pyplot as plt
from plotly.subplots import make_subplots
from ..base import CalibrationWorkflowBase
from ..data_model import ParameterDataType, ParameterEstimateModel
[docs]
class EvoTorchEvolutionary(CalibrationWorkflowBase):
"""The EvoTorch evolutionary algorithm method class."""
[docs]
def specify(self) -> None:
"""Specify the parameters of the model calibration procedure."""
self.names = []
self.data_types = []
self.bounds = []
self.lower_bounds = []
self.upper_bounds = []
parameter_spec = self.specification.parameter_spec.parameters
for spec in parameter_spec:
parameter_name = spec.name
data_type = spec.data_type
if data_type == ParameterDataType.CONSTANT:
parameter_value = spec.parameter_value
self.constants[parameter_name] = parameter_value
continue
elif data_type == ParameterDataType.CATEGORICAL:
bounds = self.set_categorical_parameter(spec)
else:
bounds = self.get_parameter_bounds(spec) # type: ignore[assignment]
lower_bound, upper_bound = bounds
if (
data_type == ParameterDataType.DISCRETE
or data_type == ParameterDataType.CATEGORICAL
):
lower_bound = np.floor(lower_bound).astype("int")
upper_bound = np.floor(upper_bound).astype("int")
self.names.append(parameter_name)
self.data_types.append(data_type)
self.bounds.append(bounds)
self.lower_bounds.append(lower_bound)
self.upper_bounds.append(upper_bound)
[docs]
def execute(self) -> None:
"""Execute the simulation calibration procedure."""
directions = []
for direction in self.specification.directions:
if direction == "minimize":
direction = "min"
elif direction == "maximize":
direction = "max"
directions.append(direction)
self.trials: list[dict[str, Any]] = []
evolutionary_kwargs = self.get_calibration_func_kwargs()
def target_function(X: torch.Tensor) -> torch.Tensor:
trial = {}
if X.ndim == 1:
for i, name in enumerate(self.names):
trial[name] = X[i].item()
self.trials.append(trial)
X = [X]
Y = self.calibration_func_wrapper(
X,
self,
self.specification.observed_data,
self.names,
self.data_types,
evolutionary_kwargs,
False,
)
self.set_output_labels_from_Y(Y)
output_labels = self.specification.output_labels
for i, label in enumerate(output_labels): # type: ignore[arg-type]
trial[label] = Y[i]
return Y
vectorized = self.specification.batched
seed = self.specification.random_seed
self.problem = Problem(
directions,
target_function,
solution_length=len(self.names),
bounds=(self.lower_bounds, self.upper_bounds),
vectorized=vectorized,
seed=seed,
)
algorithm_name = self.specification.method
supported_algorithms = dict(
ga=algos.GeneticAlgorithm, cosyne=algos.Cosyne, cmaes=algos.PyCMAES
)
algorithm_class = supported_algorithms.get(algorithm_name, None)
if algorithm_class is None:
raise ValueError(
f"Unsupported EvoTorch algorithm: {algorithm_name}.",
f"Supported EvoTorch algorithms are {', '.join(supported_algorithms)}",
)
method_kwargs = self.specification.method_kwargs
if method_kwargs is None:
method_kwargs = {}
if algorithm_name == "cmaes":
method_kwargs["obj_index"] = 0
elif algorithm_name == "ga":
operator_dict = self.specification.operators
operator_list = []
if operator_dict is not None:
for operator_name, operator_kwargs in operator_dict.items():
operator_class = getattr(operators, operator_name)
operator = operator_class(self.problem, **operator_kwargs)
operator_list.append(operator)
method_kwargs["operators"] = operator_list
self.searcher = algorithm_class(self.problem, **method_kwargs)
interval = self.specification.n_samples
self.logger = PandasLogger(self.searcher, interval=interval)
num_generations = self.specification.n_iterations
self.searcher.run(num_generations=num_generations)
[docs]
def analyze(self) -> None:
"""Analyze the results of the simulation calibration procedure."""
task, time_now, experiment_name, outdir = self.prepare_analyze()
trials_df = pd.DataFrame(self.trials)
objective_df = self.logger.to_dataframe()
output_labels = self.specification.output_labels
trials_df_best = trials_df.sort_values(output_labels).head(1)
for name in self.names:
estimate = trials_df_best[name].item()
parameter_estimate = ParameterEstimateModel(name=name, estimate=estimate)
self.add_parameter_estimate(parameter_estimate)
fig, axes = plt.subplots(
nrows=len(output_labels), # type: ignore[arg-type]
figsize=self.specification.figsize,
)
if not isinstance(axes, np.ndarray):
axes = [axes]
trials_df["index"] = trials_df.index
for i, output_label in enumerate(output_labels): # type: ignore[arg-type]
trials_df.plot.scatter(
"index", output_label, ax=axes[i], title=f"Simulated {output_label}"
)
trials_df = trials_df.drop(columns="index")
self.present_fig(fig, outdir, time_now, task, experiment_name, "trial-history")
parameter_names = self.names
for output_label in output_labels:
fig = make_subplots(
rows=1, cols=len(parameter_names), subplot_titles=parameter_names
)
for i, parameter_name in enumerate(parameter_names):
fig.add_trace(
go.Scatter(
x=trials_df[parameter_name],
y=trials_df[output_label],
mode="markers",
),
row=1,
col=i + 1,
)
fig.update_layout(yaxis_title=output_label, showlegend=False)
if outdir is not None:
outfile = self.join(
outdir,
f"{time_now}-{task}-{experiment_name}-{output_label}-plot-slice.png",
)
self.append_artifact(outfile)
fig.write_image(outfile)
else:
fig.show()
if outdir is None:
return
self.to_csv(objective_df, "objective")
self.to_csv(trials_df, "trials")