"""
Sequential learning over a panel.
"""
import numbers
import warnings
from abc import ABC
from functools import partial
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from joblib import Parallel, delayed
from sklearn.base import ClassifierMixin, RegressorMixin
from sklearn.model_selection import BaseCrossValidator, GridSearchCV, RandomizedSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from tqdm.auto import tqdm
from macrosynergy.compat import JOBLIB_RETURN_AS
from macrosynergy.learning.splitters import (
WalkForwardPanelSplit,
)
from macrosynergy.management import categories_df
from macrosynergy.management.types.qdf import QuantamentalDataFrame
[docs]class BasePanelLearner(ABC):
def __init__(
self,
df,
xcats,
cids=None,
start=None,
end=None,
blacklist=None,
freq="M",
lag=1,
xcat_aggs=["last", "sum"],
generate_labels=None,
skip_checks=False,
):
"""
Initialize a sequential learning process over a panel.
Parameters
----------
df : pd.DataFrame
Standardized daily quantamental dataframe with the four columns: "cid",
"xcat", "real_date" and "value".
xcats : list
List of xcats to be used in the learning process. The last category in the
list is the dependent variable, and all preceding categories are the
independent variables in a supervised learning framework.
cids : list, optional
Cross-sections to be included. Default is all in the dataframe.
start : str, optional
Start date for considered data in subsequent analysis in ISO 8601 format.
Default is None i.e. the earliest date in the dataframe.
end : str, optional
End date for considered data in subsequent analysis in ISO 8601 format.
Default is None i.e. the latest date in the dataframe.
blacklist : list, optional
Blacklisting dictionary specifying date ranges for which cross-sectional
information should be excluded. The keys are cross-sections and the values
are tuples of start and end dates in ISO 8601 format. Default is None.
freq : str, optional
Frequency of the data. Default is "M" for monthly.
lag : int, optional
Number of periods to lag the independent variables. Default is 1.
xcat_aggs : list, optional
List of exactly two aggregation methods for downsampling data to the frequency
specified in the freq parameter. The first parameter pertains to all
independent variable downsampling, whilst the second corresponds with the
target category. Default is ["last", "sum"].
generate_labels : callable, optional
Function to transform the dependent variable, usually into
classification labels. Default is None.
skip_checks : bool, optional
Whether to skip the initialization checks. Default is False.
Notes
-----
`BasePanelLearner` is an abstract class that provides the basic structure to
train statistical machine learning models sequentially over a panel. At each
model retraining date, an optimal model is chosen out a collection of candidate
models through a hyperparameter optimization process. Following the model
selection stage, the model is trained on date preceding the retraining date,
maintaining the point-in-time principle of JPMaQS. Analytics from the selected
model are stored for interpretability of the process.
The hyperparameter optimization process revolves around cross-validation [1]_.
Cross-validation estimates out-of-sample performance metrics. Each candidate
model is cross-validated over a collection of possible hyperparameters and the
model with the best score is chosen. When all possible models are directly
specified and cross-validated, we call this process a 'grid search'. When models
are sampled from a given collection, we call this process a 'random search'.
These are the two methods currently supported by `BasePanelLearner`. In a future
release, we will add support for Bayesian hyperparameter searches [2]_.
References
----------
.. [1] Brownlee, J. (2023). A Gentle Introduction to k-fold Cross-Validation.
https://machinelearningmastery.com/k-fold-cross-validation/
.. [2] Frazier, P.I., 2018. A tutorial on Bayesian optimization.
https://arxiv.org/abs/1807.02811
"""
# Checks
if not skip_checks:
self._check_init(
df,
xcats,
cids,
start,
end,
blacklist,
freq,
lag,
xcat_aggs,
generate_labels,
)
# Attributes
self.df = QuantamentalDataFrame(df)
self.xcats = xcats
self.cids = cids
self.start = start
self.end = end
self.blacklist = blacklist
self.freq = freq
self.lag = lag
self.xcat_aggs = xcat_aggs
self.generate_labels = generate_labels
# Create long-format dataframe
df_long = (
categories_df(
df=self.df,
xcats=self.xcats,
cids=self.cids,
start=self.start,
end=self.end,
blacklist=self.blacklist,
freq=self.freq,
lag=self.lag,
xcat_aggs=self.xcat_aggs,
)
.dropna()
.sort_index()
)
# Create X and y
self.X = df_long.iloc[:, :-1]
self.y = df_long.iloc[:, -1]
if self.generate_labels is not None:
self.y = self.y.apply(self.generate_labels)
# Store necessary index information
self.index = self.X.index
self.date_levels = self.index.get_level_values(1)
self.xs_levels = self.index.get_level_values(0)
self.unique_date_levels = sorted(self.date_levels.unique())
self.unique_xs_levels = sorted(self.xs_levels.unique())
# Create initial dataframe to store model selection data from the learning process
self.chosen_models = pd.DataFrame(
columns=[
"real_date",
"name",
"model_type",
"score",
"hparams",
"n_splits_used",
]
).astype(
{
"real_date": "datetime64[ns]",
"name": "category",
"model_type": "category",
"score": "float32",
"hparams": "object",
"n_splits_used": "object",
}
)
self.timestamps = []
[docs] def run(
self,
name,
models,
outer_splitter,
inner_splitters=None,
hyperparameters=None,
scorers=None,
search_type="grid",
normalize_fold_results=False,
cv_summary="mean",
include_train_folds=False,
n_iter=100,
split_functions=None,
n_jobs_outer=-1,
n_jobs_inner=1,
):
"""
Run a learning process over a panel.
Parameters
----------
name : str
Category name for the forecasted panel resulting from the learning process.
models : dict
Dictionary of model names and compatible `scikit-learn` model objects.
outer_splitter : WalkForwardPanelSplit
Outer splitter for the learning process.
inner_splitters : dict, optional
Inner splitters for the learning process.
hyperparameters : dict, optional
Dictionary of model names and hyperparameter grids.
scorers : dict, optional
Dictionary of `scikit-learn` compatible scoring functions.
search_type : str
Search type for hyperparameter optimization. Default is "grid".
Options are "grid", "prior" and "bayes". If no hyperparameter tuning
is required, this parameter can be disregarded.
normalize_fold_results : bool
Whether to normalize the scores across folds before combining them. Default is
False. If no hyperparameter tuning is required, this parameter can be disregarded.
cv_summary : str or callable
Summary function to use to combine scores across cross-validation folds.
Default is "mean". Options are "mean", "median", "mean-std", "mean/std",
"mean-std-ge" or a callable function. If no hyperparameter tuning
is required, this parameter can be disregarded.
include_train_folds : bool, optional
Whether to calculate cross-validation statistics on the training folds in
additional to the test folds. If no hyperparameter tuning
is required, this parameter can be disregarded.
n_iter : int
Number of iterations for random or bayesian hyperparameter optimization.
If no hyperparameter tuning is required, this parameter can be disregarded.
split_functions : dict, optional
Dictionary of callables for determining the number of cross-validation
splits to add to the initial number, as a function of the number of iterations
passed in the sequential learning process. If no hyperparameter tuning
is required, this parameter can be disregarded.
n_jobs_outer : int, optional
Number of jobs to run in parallel for the outer loop. Default is -1.
n_jobs_inner : int, optional
Number of jobs to run in parallel for the inner loop. Default is 1. If no
hyperparameter tuning is required, this parameter can be disregarded.
Returns
-------
list
List of dictionaries containing the results of the learning process.
"""
# Checks
self._check_run(
name=name,
outer_splitter=outer_splitter,
inner_splitters=inner_splitters,
models=models,
hyperparameters=hyperparameters,
scorers=scorers,
search_type=search_type,
normalize_fold_results=normalize_fold_results,
cv_summary=cv_summary,
include_train_folds=include_train_folds,
n_iter=n_iter,
split_functions=split_functions,
n_jobs_outer=n_jobs_outer,
n_jobs_inner=n_jobs_inner,
)
# Determine all outer splits and run the learning process in parallel
train_test_splits = list(outer_splitter.split(self.X, self.y))
if inner_splitters is not None:
base_splits = self._get_base_splits(inner_splitters)
else:
# No CV is performed, so no base splits are needed
base_splits = None
# Return list of results
optim_results = tqdm(
Parallel(n_jobs=n_jobs_outer, **JOBLIB_RETURN_AS)(
delayed(self._worker)(
name=name,
train_idx=train_idx,
test_idx=test_idx,
inner_splitters=inner_splitters,
models=models,
hyperparameters=hyperparameters,
scorers=scorers,
cv_summary=cv_summary,
include_train_folds=include_train_folds,
search_type=search_type,
normalize_fold_results=normalize_fold_results,
n_iter=n_iter,
n_splits_add=self._get_n_splits_add(
iteration, outer_splitter, split_functions
),
n_jobs_inner=n_jobs_inner,
base_splits=base_splits,
)
for iteration, (train_idx, test_idx) in enumerate(train_test_splits)
),
total=len(train_test_splits),
)
return optim_results
def _worker(
self,
name,
train_idx,
test_idx,
inner_splitters,
models,
hyperparameters,
scorers,
cv_summary,
include_train_folds,
search_type,
normalize_fold_results,
n_iter,
n_splits_add,
n_jobs_inner,
base_splits,
timestamp=None,
):
"""
Worker function for parallel processing of the learning process.
Parameters
----------
name : str
Category name for the forecasted panel resulting from the learning process.
train_idx : np.ndarray
Training indices for the current outer split.
test_idx : np.ndarray
Test indices for the current outer split.
inner_splitters : dict, optional
Inner splitters for the learning process.
models : dict
Compatible `scikit-learn` model objects.
hyperparameters : dict, optional
Hyperparameter grids.
scorers : dict, optional
Compatible `scikit-learn` scoring functions.
cv_summary : str or callable, optional
Summary function to condense cross-validation scores in each fold to a single
value, against which different hyperparameter choices can be compared.
If no hyperparameter tuning is required, this parameter can be disregarded.
include_train_folds : bool, optional
Whether to calculate cross-validation statistics on the training folds in
additional to the test folds.
If no hyperparameter tuning is required, this parameter can be disregarded.
search_type : str, optional
Search type for hyperparameter optimization. Default is "grid".
Options are "grid", "prior" and "bayes".
If no hyperparameter tuning is required, this parameter can be disregarded.
normalize_fold_results : bool, optional
Whether to normalize the scores across folds before combining them.
If no hyperparameter tuning is required, this parameter can be disregarded.
n_iter : int, optional
Number of iterations for random or bayesian hyperparameter optimization.
If no hyperparameter tuning is required, this parameter can be disregarded.
n_splits_add : list, optional
List of integers to add to the number of splits for each inner splitter.
Default is None.
n_jobs_inner : int, optional
Number of jobs to run in parallel for the inner loop. Default is 1.
If no hyperparameter tuning is required, this parameter can be disregarded.
base_splits : dict, optional
Dictionary of initial number of splits for each inner splitter.
If no hyperparameter tuning is required, this parameter is None.
timestamp : pd.Timestamp, optional
Date to record predictions and model diagnostics. Default is None. If None,
the earliest date in the test set is used (which is then adjusted for lag).
Returns
-------
dict
Dictionary comprising model selection data and predictive analytics.
"""
# Train-test split
X_train, X_test = self.X.iloc[train_idx, :], self.X.iloc[test_idx, :]
y_train, y_test = self.y.iloc[train_idx], self.y.iloc[test_idx]
# Determine correct timestamps of test set forecasts
# First get test index information
test_index = self.index[test_idx]
test_xs_levels = self.xs_levels[test_idx]
test_date_levels = self.date_levels[test_idx]
sorted_test_date_levels = sorted(test_date_levels.unique())
# Since the features lag behind the targets, the dates need to be adjusted
# by the lag applied.
if self.lag != 0:
locs: np.ndarray = (
np.searchsorted(
self.unique_date_levels, sorted_test_date_levels, side="left"
)
- 1
)
adj_test_date_levels: pd.DatetimeIndex = pd.DatetimeIndex(
[self.unique_date_levels[i] if i >= 0 else pd.NaT for i in locs]
)
# Now formulate correct index
date_map = dict(zip(sorted_test_date_levels, adj_test_date_levels))
mapped_dates = test_date_levels.map(date_map)
test_index = pd.MultiIndex.from_arrays(
[test_xs_levels, mapped_dates], names=["cid", "real_date"]
)
else:
adj_test_date_levels = test_date_levels
if inner_splitters is not None:
if n_splits_add is not None:
inner_splitters_adj = inner_splitters.copy()
for splitter_name, _ in inner_splitters_adj.items():
if hasattr(inner_splitters_adj[splitter_name], "n_splits"):
inner_splitters_adj[splitter_name].n_splits = (
base_splits[splitter_name] + n_splits_add[splitter_name]
)
else:
inner_splitters_adj = inner_splitters
optim_name, optim_model, optim_score, optim_params = self._model_search(
X_train=X_train,
y_train=y_train,
inner_splitters=inner_splitters_adj,
models=models,
hyperparameters=hyperparameters,
scorers=scorers,
search_type=search_type,
normalize_fold_results=normalize_fold_results,
n_iter=n_iter,
cv_summary=cv_summary,
include_train_folds=include_train_folds,
n_jobs_inner=n_jobs_inner,
)
else:
optim_name = list(models.keys())[0]
optim_model = models[optim_name].fit(X_train, y_train)
optim_score = np.ubyte(0) # For memory efficiency
optim_params = {}
inner_splitters_adj = None
split_results = self._get_split_results(
pipeline_name=name,
optimal_model=optim_model,
optimal_model_name=optim_name,
optimal_model_score=optim_score,
optimal_model_params=optim_params,
inner_splitters_adj=inner_splitters_adj,
X_train=X_train,
y_train=y_train,
X_test=X_test,
y_test=y_test,
timestamp=adj_test_date_levels.min() if timestamp is None else timestamp,
adjusted_test_index=test_index,
)
return split_results
def _model_search(
self,
X_train,
y_train,
inner_splitters,
models,
hyperparameters,
scorers,
search_type,
normalize_fold_results,
n_iter,
cv_summary,
include_train_folds,
n_jobs_inner,
):
"""
Determine optimal model based on cross-validation from a given training set.
Parameters
----------
X_train : pd.DataFrame
Training data.
y_train : pd.Series
Training target.
inner_splitters : dict
Inner splitters for the learning process.
models : dict
Compatible `scikit-learn` model objects.
hyperparameters : dict
Hyperparameter grids.
scorers : dict
Compatible `scikit-learn` scoring functions.
search_type : str
Search type for hyperparameter optimization. Default is "grid".
normalize_fold_results : bool
Whether to normalize the scores across folds before combining them.
n_iter : int
Number of iterations for random or bayesian hyperparameter optimization.
cv_summary : str or callable
Summary function to condense cross-validation scores in each fold to a single
value, against which different hyperparameter choices can be compared.
include_train_folds : bool, optional
Whether to calculate cross-validation statistics on the training folds in
the cross-validation process.
n_jobs_inner : int
Number of jobs to run in parallel for the inner loop.
Returns
-------
tuple
Optimal model name, optimal model, optimal model score and optimal
model hyperparameters.
"""
optim_name = None
optim_model = None
optim_score = np.float32("-inf")
optim_params = {}
cv_splits = []
for splitter in inner_splitters.values():
cv_splits.extend(list(splitter.split(X=X_train, y=y_train)))
# TODO: instead of picking one model, the best hyperparameters could be selected
# for each model and then "final" prediction would be the average of the individual
# predictions. This would be a simple ensemble method.
for model_name, model in models.items():
# For each model, find the optimal hyperparameters
if search_type == "grid":
search_object = GridSearchCV(
estimator=model,
param_grid=hyperparameters[model_name],
scoring=scorers,
n_jobs=n_jobs_inner,
refit=partial(
self._model_selection,
cv_summary=cv_summary,
scorers=scorers,
normalize_fold_results=normalize_fold_results,
),
cv=cv_splits,
return_train_score = include_train_folds,
)
elif search_type == "prior":
search_object = RandomizedSearchCV(
estimator=model,
param_distributions=hyperparameters[model_name],
n_iter=n_iter,
scoring=scorers,
n_jobs=n_jobs_inner,
refit=partial(
self._model_selection,
cv_summary=cv_summary,
scorers=scorers,
normalize_fold_results=normalize_fold_results,
),
cv=cv_splits,
return_train_score = include_train_folds,
)
try:
search_object.fit(X_train, y_train)
except Exception as e:
warnings.warn(
f"Error running a hyperparameter search for {model_name}: {e}",
RuntimeWarning,
)
continue
score = self._model_selection(
search_object.cv_results_,
cv_summary,
scorers,
normalize_fold_results,
return_index=False,
)
if score > optim_score:
optim_name = model_name
optim_model = search_object.best_estimator_
optim_score = score
optim_params = search_object.best_params_
return optim_name, optim_model, optim_score, optim_params
def _model_selection(
self, cv_results, cv_summary, scorers, normalize_fold_results, return_index=True
):
"""
Select the optimal hyperparameters based on a `scikit-learn` cv_results dataframe.
Parameters
----------
cv_results : dict
Cross-validation results dictionary.
cv_summary : str or callable
Summary function to condense cross-validation scores in each fold to a single
value, against which different hyperparameter choices can be compared.
scorers : dict
Compatible `scikit-learn` scoring functions.
normalize_fold_results : bool
Whether to normalize the scores across folds before combining them.
return_index : bool
Whether to return the index of the best estimator or the maximal score itself.
Default is True.
Returns
-------
int or float
Either the index of the best estimator or the maximal score itself.
"""
cv_results = pd.DataFrame(cv_results)
metric_columns = [
col
for col in cv_results.columns
if col.startswith("split") and ("test" in col or "train" in col)
]
if normalize_fold_results:
cv_results[metric_columns] = StandardScaler().fit_transform(
cv_results[metric_columns]
)
# For each metric, summarise the scores across folds for each hyperparameter choice
# using cv_summary
for scorer in scorers.keys():
# Extract the test scores for each fold for that scorer
scorer_columns = [col for col in metric_columns if scorer in col]
if cv_summary == "mean":
cv_results[f"{scorer}_summary"] = cv_results[scorer_columns].mean(
axis=1,
)
elif cv_summary == "median":
cv_results[f"{scorer}_summary"] = cv_results[scorer_columns].median(
axis=1
)
elif cv_summary == "mean-std":
cv_results[f"{scorer}_summary"] = cv_results[scorer_columns].mean(
axis=1
) - cv_results[scorer_columns].std(axis=1)
elif cv_summary == "mean/std":
cv_results[f"{scorer}_summary"] = cv_results[scorer_columns].mean(
axis=1
) / cv_results[scorer_columns].std(axis=1)
elif cv_summary == "mean-std-ge":
# Separate training and columns
train_columns = [col for col in scorer_columns if "train" in col]
test_columns = [col for col in scorer_columns if "test" in col]
# obtain mean and std of test metrics
mean_test = np.nanmean(cv_results[test_columns], axis=1)
std_test = np.nanstd(cv_results[test_columns], axis=1)
# Determine generalization gap.
# We define this to be the average absolute deviation of the test metrics
# from their corresponding training metrics.
generalization_gap = np.nanmean(
np.abs(
cv_results[train_columns].values - cv_results[test_columns].values
),
axis=1
)
# Store mean test metric - std test metric - generalization gap
cv_results[f"{scorer}_summary"] = mean_test - std_test - generalization_gap
else:
# TODO: handle NAs?
cv_results[f"{scorer}_summary"] = cv_results[scorer_columns].apply(
cv_summary, axis=1
)
# Now scale the summary scores for each scorer
scaler = StandardScaler()
summary_cols = [f"{scorer}_summary" for scorer in scorers.keys()]
if len(scorers) > 1:
scaler = StandardScaler()
cv_results[summary_cols] = scaler.fit_transform(cv_results[summary_cols])
# Now average the summary scores for each scorer
cv_results["final_score"] = cv_results[summary_cols].mean(axis=1)
# Return index of best estimator
# TODO: handle case where multiple hyperparameter choices have the same score
# We currently return the first one
if return_index:
return cv_results["final_score"].idxmax()
else:
return cv_results["final_score"].max()
def _get_split_results(
self,
pipeline_name,
optimal_model,
optimal_model_name,
optimal_model_score,
optimal_model_params,
inner_splitters_adj,
X_train,
y_train,
X_test,
y_test,
timestamp,
adjusted_test_index,
):
"""
Store model selection information and predictive analytics for training set
(X_train, y_train).
Parameters
----------
pipeline_name : str
Name of the sequential optimization pipeline.
optimal_model : RegressorMixin or ClassifierMixin or Pipeline
Optimal model selected for the training set.
optimal_model_name : str
Name of the optimal model.
optimal_model_score : float
Score of the optimal model.
optimal_model_params : dict
Hyperparameters of the optimal model.
inner_splitters_adj : dict
Inner splitters for the learning process.
X_train : pd.DataFrame
Input feature matrix.
y_train : pd.Series
Target variable.
X_test : pd.DataFrame
Input feature matrix.
y_test : pd.Series
Target variable.
timestamp : pd.Timestamp
Model retraining date.
adjusted_test_index : pd.MultiIndex
Adjusted test index to account for lagged features.
Returns
-------
dict
Dictionary containing model selection data and predictive analytics.
"""
split_result = dict()
model_result = self._store_model_choice_data(
pipeline_name,
optimal_model,
optimal_model_name,
optimal_model_score,
optimal_model_params,
inner_splitters_adj,
X_train,
y_train,
X_test,
y_test,
timestamp,
)
split_result.update(model_result)
split_data = self.store_split_data(
pipeline_name,
optimal_model,
optimal_model_name,
optimal_model_score,
optimal_model_params,
inner_splitters_adj,
X_train,
y_train,
X_test,
y_test,
timestamp,
adjusted_test_index,
)
split_result.update(split_data)
return split_result
def _store_model_choice_data(
self,
pipeline_name,
optimal_model,
optimal_model_name,
optimal_model_score,
optimal_model_params,
inner_splitters_adj,
X_train,
y_train,
X_test,
y_test,
timestamp,
):
"""
Store model selection information for training set (X_train, y_train).
Parameters
----------
pipeline_name : str
Name of the sequential optimization pipeline.
optimal_model : RegressorMixin or ClassifierMixin or Pipeline
Optimal model selected for the training set.
optimal_model_name : str
Name of the optimal model.
optimal_model_score : float
Score of the optimal model.
optimal_model_params : dict
Hyperparameters of the optimal model.
inner_splitters_adj : dict
Inner splitters for the learning process.
X_train : pd.DataFrame
Input feature matrix.
y_train : pd.Series
Target variable.
X_test : pd.DataFrame
Input feature matrix.
y_test : pd.Series
Target variable.
timestamp : pd.Timestamp
Model retraining date.
Returns
-------
dict
Dictionary containing model selection data.
"""
if optimal_model is None:
warnings.warn(
f"No model was selected at time {timestamp}",
RuntimeWarning,
)
optimal_model_name = None
optimal_model_score = np.float32("-inf")
optimal_model_params = {}
data = [
timestamp,
optimal_model_name,
optimal_model_score,
optimal_model_params,
]
if inner_splitters_adj is not None:
n_splits = {
splitter_name: splitter.n_splits
for splitter_name, splitter in inner_splitters_adj.items()
}
data.append(n_splits)
else:
data.append(0) # No splits were used because CV wasn't used
return {"model_choice": data}
[docs] def store_split_data(
self,
pipeline_name,
optimal_model,
optimal_model_name,
optimal_model_score,
optimal_model_params,
inner_splitters_adj,
X_train,
y_train,
X_test,
y_test,
timestamp,
adjusted_test_index,
):
"""
Store predictive analytics for training set (X_train, y_train).
Parameters
----------
pipeline_name : str
Name of the sequential optimization pipeline.
optimal_model : RegressorMixin or ClassifierMixin or Pipeline
Optimal model selected for the training set.
optimal_model_name : str
Name of the optimal model.
optimal_model_score : float
Score of the optimal model.
optimal_model_params : dict
Hyperparameters of the optimal model.
inner_splitters_adj : dict
Inner splitters for the learning process.
X_train : pd.DataFrame
Input feature matrix.
y_train : pd.Series
Target variable.
X_test : pd.DataFrame
Input feature matrix.
y_test : pd.Series
Target variable.
timestamp : pd.Timestamp
Model retraining date.
adjusted_test_index : pd.MultiIndex
Adjusted test index to account for lagged features.
Returns
-------
dict
Dictionary containing predictive analytics.
"""
return dict()
[docs] def get_optimal_models(self, name=None):
"""
Returns the sequences of optimal models for one or more processes.
Parameters
----------
name : str or list, optional
Label of sequential optimization process. Default is all stored in the class
instance.
Returns
-------
pd.DataFrame
Pandas dataframe of the optimal models and hyperparameters selected at each
retraining date.
"""
if name is None:
return self.chosen_models
else:
if isinstance(name, str):
name = [name]
elif not isinstance(name, list):
raise TypeError(
"The process name must be a string or a list of strings."
)
for n in name:
if n not in self.chosen_models.name.unique():
raise ValueError(
f"""The process name '{n}' is not in the list of already-run
pipelines. Please check the name carefully. If correct, please run
calculate_predictions() first.
"""
)
return self.chosen_models[self.chosen_models.name.isin(name)]
[docs] def models_heatmap(
self,
name,
title=None,
cap=5,
figsize=(12, 8),
title_fontsize=None,
tick_fontsize=None,
):
"""
Visualized optimal models used for signal calculation.
Parameters
----------
name : str
Name of the sequential optimization pipeline.
title : str, optional
Title of the heatmap. Default is None. This creates a figure title of the form
"Model Selection Heatmap for {name}".
cap : int, optional
Maximum number of models to display. Default (and limit) is 5. The chosen
models are the 'cap' most frequently occurring in the pipeline.
figsize : tuple, optional
Tuple of floats or ints denoting the figure size. Default is (12, 8).
title_fontsize : int, optional
Font size for the title. Default is None.
tick_fontsize : int, optional
Font size for the ticks. Default is None.
Notes
-----
This method displays the models selected at each date in time over the span
of the sequential learning process. A binary heatmap is used to visualise
the model selection process.
"""
# Checks
self._checks_models_heatmap(
name=name,
title=title,
cap=cap,
figsize=figsize,
title_fontsize=title_fontsize,
tick_fontsize=tick_fontsize,
)
# Get the chosen models for the specified pipeline to visualise selection.
chosen_models = self.get_optimal_models(name=name).sort_values(by="real_date")
chosen_models["model_hparam_id"] = chosen_models.apply(
lambda row: (
row["model_type"]
if row["hparams"] == {}
else f"{row['model_type']}_"
+ "_".join([f"{key}={value}" for key, value in row["hparams"].items()])
),
axis=1,
)
chosen_models["real_date"] = chosen_models["real_date"].dt.date
model_counts = chosen_models.model_hparam_id.value_counts()
chosen_models = chosen_models[
chosen_models.model_hparam_id.isin(model_counts.index[:cap])
]
unique_models = chosen_models.model_hparam_id.unique()
unique_models = sorted(unique_models, key=lambda x: -model_counts[x])
unique_dates = chosen_models.real_date.unique()
# Fill in binary matrix denoting the selected model at each time
binary_matrix = pd.DataFrame(0, index=unique_models, columns=unique_dates)
for _, row in chosen_models.iterrows():
model_id = row["model_hparam_id"]
date = row["real_date"]
binary_matrix.at[model_id, date] = 1
# Display the heatmap.
plt.figure(figsize=figsize)
if binary_matrix.shape[0] == 1:
sns.heatmap(binary_matrix, cmap="binary_r", cbar=False)
else:
sns.heatmap(binary_matrix, cmap="binary", cbar=False)
plt.title(title, fontsize=title_fontsize)
plt.xticks(fontsize=tick_fontsize) # X-axis tick font size
plt.yticks(fontsize=tick_fontsize)
plt.show()
def _check_init(
self,
df,
xcats,
cids,
start,
end,
blacklist,
freq,
lag,
xcat_aggs,
generate_labels,
):
"""
Checks for the constructor.
Parameters
----------
df : pd.DataFrame
Long-format dataframe.
xcats : list
List of xcats to be used in the learning process.
cids : list, optional
List of cids to be used in the learning process. Default is None.
start : str, optional
Start date for the learning process. Default is None.
end : str, optional
End date for the learning process. Default is None.
blacklist : dict, optional
Dictionary of dates to exclude from the learning process. Default is None.
freq : str
Frequency of the data. Options are "D", "W", "M", "Q" and "Y".
lag : int
Lag to apply to the features. Default is 0.
xcat_aggs : list
List of aggregation functions to apply to the independent and
dependent variables respectively.
generate_labels : callable, optional
Function to generate labels for a supervised learning process.
Default is None.
"""
# Dataframe checks
if not isinstance(df, pd.DataFrame):
raise TypeError("df must be a pandas DataFrame.")
if not set(["cid", "xcat", "real_date", "value"]).issubset(df.columns):
raise ValueError(
"df must have columns 'cid', 'xcat', 'real_date' and 'value'."
)
if len(df) < 1:
raise ValueError("df must not be empty")
# categories checks
if not isinstance(xcats, list):
raise TypeError("xcats must be a list.")
if len(xcats) < 2:
raise ValueError("xcats must have at least two elements.")
if not all(isinstance(xcat, str) for xcat in xcats):
raise TypeError("All elements in xcats must be strings.")
difference_xcats = set(xcats) - set(df["xcat"].unique())
if difference_xcats != set():
raise ValueError(f"{str(difference_xcats)} not in the dataframe.")
# cids checks
if cids is not None:
if not isinstance(cids, list):
raise TypeError("cids must be a list.")
if not all(isinstance(cid, str) for cid in cids):
raise TypeError("All elements in cids must be strings.")
difference_cids = set(cids) - set(df["cid"].unique())
if difference_cids != set():
raise ValueError(f"{str(difference_cids)} not in the dataframe.")
# start checks
if start is not None:
if not isinstance(start, str):
raise TypeError("'start' must be a string.")
try:
pd.to_datetime(start)
except ValueError:
raise ValueError("'start' must be in ISO 8601 format.")
if pd.to_datetime(start) > pd.to_datetime(df["real_date"]).max():
raise ValueError("'start' must be before the last date in the panel.")
# end checks
if end is not None:
if not isinstance(end, str):
raise TypeError("'end' must be a string.")
try:
pd.to_datetime(end)
except ValueError:
raise ValueError("'end' must be in ISO 8601 format.")
if pd.to_datetime(end) < pd.to_datetime(df["real_date"]).min():
raise ValueError("'end' must be after the first date in the panel.")
if start is not None and end is not None:
if pd.to_datetime(start) > pd.to_datetime(end):
raise ValueError("'start' must be before 'end'.")
# blacklist checks
if blacklist is not None:
if not isinstance(blacklist, dict):
raise TypeError("The blacklist argument must be a dictionary.")
for key, value in blacklist.items():
# check keys are strings
if not isinstance(key, str):
raise TypeError(
"The keys of the blacklist argument must be strings."
)
# check values of tuples of length two
if not isinstance(value, tuple):
raise TypeError(
"The values of the blacklist argument must be tuples."
)
if len(value) != 2:
raise ValueError(
"The values of the blacklist argument must be tuples of length "
"two."
)
# ensure each of the dates in the dictionary are timestamps
for date in value:
if not isinstance(date, pd.Timestamp):
raise TypeError(
"The values of the blacklist argument must be tuples of "
"pandas Timestamps."
)
# freq checks
if not isinstance(freq, str):
raise TypeError("freq must be a string.")
if freq not in ["D", "W", "M", "Q", "Y"]:
raise ValueError("freq must be one of 'D', 'W', 'M', 'Q' or 'Y'.")
# lag checks
if not isinstance(lag, int):
raise TypeError("lag must be an integer.")
if lag < 0:
raise ValueError("lag must be non-negative.")
# xcat_aggs checks
if not isinstance(xcat_aggs, list):
raise TypeError("xcat_aggs must be a list.")
if not all(isinstance(xcat_agg, str) for xcat_agg in xcat_aggs):
raise ValueError("All elements in xcat_aggs must be strings.")
if len(xcat_aggs) != 2:
raise ValueError("xcat_aggs must have exactly two elements.")
# generate_labels checks
if generate_labels is not None:
if not callable(generate_labels):
raise TypeError("generate_labels must be a callable.")
def _check_run(
self,
name,
outer_splitter,
inner_splitters,
models,
hyperparameters,
scorers,
normalize_fold_results,
search_type,
cv_summary,
include_train_folds,
n_iter,
split_functions,
n_jobs_outer,
n_jobs_inner,
):
"""
Input parameter checks for the run method.
Parameters
----------
name : str
Name of the sequential optimization pipeline.
models : dict
Compatible `scikit-learn` model objects.
outer_splitter : BasePanelSplit
Outer splitter for the learning process.
inner_splitters : dict
Inner splitters for the learning process.
hyperparameters : dict
Hyperparameter grids.
scorers : dict
Compatible `scikit-learn` scoring functions.
normalize_fold_results : bool
Whether to normalize the scores across folds before combining them.
search_type : str
Search type for hyperparameter optimization.
cv_summary : str or callable
Summary function to condense cross-validation scores in each fold to a single
value, against which different hyperparameter choices can be compared.
include_train_folds : bool
Whether to calculate cross-validation statistics on the training folds in
additional to the test folds. If True, the cross-validation estimator will be
a function of both training data and test data.
n_iter : int
Number of iterations for random or bayesian hyperparameter optimization.
split_functions : dict
Dictionary of functions associated with each inner splitter describing how to
increase the number of splits as a function of the number of iterations passed.
n_jobs_outer : int
Number of jobs to run in parallel for the outer loop.
n_jobs_inner : int
Number of jobs to run in parallel for the inner loop.
"""
# name
if not isinstance(name, str):
raise TypeError("name must be a string.")
# models
if not isinstance(models, dict):
raise TypeError("The models argument must be a dictionary.")
if models == {}:
raise ValueError("The models dictionary cannot be empty.")
for key in models.keys():
if not isinstance(key, str):
raise ValueError("The keys of the models dictionary must be strings.")
if not isinstance(models[key], (RegressorMixin, ClassifierMixin, Pipeline)):
raise ValueError(
"The values of the models dictionary must be sklearn predictors or "
"pipelines."
)
# outer splitter
if outer_splitter:
if not isinstance(
outer_splitter, (WalkForwardPanelSplit)
):
raise TypeError(
"outer_splitter must inherit from msl.splitters.WalkForwardPanelSplit"
)
# First check that both inner_splitters and scorers are None
# or none of them are None.
none_condition = all(
(
inner_splitters is None,
scorers is None,
),
)
not_none_condition = all(
(
inner_splitters is not None,
scorers is not None,
),
)
if not (none_condition or not_none_condition):
raise ValueError(
"Either both inner_splitters and scorers must be None "
"or none of them can be None."
)
# Now check that if they are None, then only one model can be specified
# and no hyperparameters can be specified.
if none_condition:
if hyperparameters is not None or len(models) > 1:
raise ValueError(
"If inner_splitters and scorers are None, then only one model can be "
"specified and no hyperparameters can be specified."
)
else:
if hyperparameters is None:
raise ValueError(
"If inner_splitters and scorers are not None, then hyperparameters "
"must be specified. This can be a dictionary that specifies "
"no hyperparameters if necessary."
)
# inner splitters
if inner_splitters is not None:
if not isinstance(inner_splitters, dict):
raise TypeError("inner splitters should be specified as a dictionary")
if inner_splitters == {}:
raise ValueError("The inner splitters dictionary cannot be empty.")
for names in inner_splitters.keys():
if not isinstance(names, str):
raise ValueError(
"The keys of the inner splitters dictionary must be strings."
)
if not isinstance(inner_splitters[names], BaseCrossValidator):
raise ValueError(
"The values of the inner splitters dictionary must be instances of BaseCrossValidator."
)
# hyperparameters
if hyperparameters is not None:
if not isinstance(hyperparameters, dict):
raise TypeError("hyperparameters must be a dictionary.")
for pipe_name, pipe_params in hyperparameters.items():
if not isinstance(pipe_name, str):
raise ValueError(
"The keys of the hyperparameters dictionary must be strings."
)
if isinstance(pipe_params, dict):
self._check_hyperparam_grid(search_type, pipe_params)
elif isinstance(pipe_params, list):
for param_set in pipe_params:
if not isinstance(param_set, dict):
raise ValueError(
"The values of the hyperparameters dictionary must be dictionaries or lists."
)
self._check_hyperparam_grid(search_type, param_set)
else:
raise ValueError(
"The values of the hyperparameters dictionary must be dictionaries or lists."
)
# Check that the keys of the hyperparameter grid match those in the models dict
if sorted(hyperparameters.keys()) != sorted(models.keys()):
raise ValueError(
"The keys in the hyperparameter grid must match those in the models "
"dictionary."
)
# scorers
if scorers is not None:
if not isinstance(scorers, dict):
raise TypeError("scorers must be a dictionary.")
if scorers == {}:
raise ValueError("The scorers dictionary cannot be empty.")
for key in scorers.keys():
if not isinstance(key, str):
raise ValueError("The keys of the scorers dictionary must be strings.")
if not callable(scorers[key]):
raise ValueError(
"The values of the scorers dictionary must be callable scoring functions."
)
# search_type
if not_none_condition:
if not isinstance(search_type, str):
raise TypeError("search_type must be a string.")
if search_type not in ["grid", "prior", "bayes"]:
raise ValueError("search_type must be one of 'grid', 'prior' or 'bayes'.")
if search_type == "bayes":
raise NotImplementedError(
"Bayesian hyperparameter search is not yet implemented."
)
# cv_summary
if not_none_condition:
if not isinstance(cv_summary, str) and not callable(cv_summary):
raise TypeError("cv_summary must be a string or a callable.")
if isinstance(cv_summary, str):
if cv_summary not in [
"mean",
"median",
"mean-std",
"mean/std",
"mean-std-ge",
]:
raise ValueError(
"cv_summary must be one of 'mean', 'median', 'mean-std', 'mean-std-ge' or ",
"'mean/std'"
)
else:
try:
test_summary = cv_summary([1, 2, 3])
except Exception as e:
raise TypeError(
"cv_summary must be a function that takes a list of scores and returns "
"a single value. Check the validity of cv_summary. Error raised when "
"testing the function with [1, 2, 3]: {e}"
)
if not isinstance(test_summary, numbers.Number) and not isinstance(bool):
raise TypeError(
"cv_summary must be a function that takes a list of scores and returns "
"a single value. Check whether the output of cv_summary is a number."
)
# include_train_folds
if not_none_condition:
if not isinstance(include_train_folds, bool):
raise TypeError("include_train_folds must be a boolean.")
if include_train_folds:
if cv_summary in ["mean", "median", "mean-std", "mean/std"]:
warnings.warn(
"include_train_folds is True, which means that evaluation on training "
"folds is included in the cross-validation estimator. Check that "
"cv_summary is set appropriately.",
UserWarning,
)
if cv_summary == "mean-std-ge":
if not include_train_folds:
raise ValueError(
"include_train_folds must be True if cv_summary is 'mean-std-ge'."
)
# n_iter
if not_none_condition:
if search_type == "prior":
if not isinstance(n_iter, int):
raise TypeError("If search_type is 'prior', n_iter must be an integer.")
if n_iter < 1:
raise ValueError("The n_iter argument must be greater than zero.")
elif n_iter is not None and not isinstance(n_iter, int):
raise ValueError("n_iter must only be used if search_type is 'prior'.")
# normalize_fold_results
if not_none_condition:
if not isinstance(normalize_fold_results, bool):
raise TypeError("normalize_fold_results must be a boolean.")
if normalize_fold_results:
if search_type == "grid":
for model in hyperparameters.keys():
num_models = sum(
[
len(hyperparameters[model][hparam])
for hparam in hyperparameters[model].keys()
]
)
if num_models < 2:
raise ValueError(
"normalize_fold_results cannot be True if there are less than 2 candidate models. "
f"This is the case for the model {model}."
)
if num_models == 2:
warnings.warn(
"normalize_fold_results is True but there are only two candidate models for "
f"the model {model}. It is recommended for at least three candidate models "
"to be available for normalization to be meaningful.",
UserWarning,
)
elif search_type == "prior":
if n_iter < 2:
raise ValueError(
"normalize_fold_results cannot be True if n_iter is less than 2."
)
if n_iter == 2:
warnings.warn(
"normalize_fold_results is True but n_iter is 2. It is recommended for n_iter to be "
"at least 3 for normalization to be meaningful.",
UserWarning,
)
# split_functions
if not_none_condition and split_functions is not None:
if not isinstance(split_functions, dict):
raise TypeError("split_functions must be a dictionary.")
if len(
set(split_functions.keys()).intersection(set(inner_splitters.keys()))
) != len(inner_splitters):
raise ValueError(
"The keys of the split_functions dictionary must match the keys of the inner_splitters dictionary."
)
for key in split_functions.keys():
if not isinstance(key, str):
raise ValueError(
"The keys of the split_functions dictionary must be strings."
)
if split_functions[key] is not None:
if not callable(split_functions[key]):
raise ValueError(
"The values of the split_functions dictionary must be callables or None."
)
# n_jobs_outer
if not isinstance(n_jobs_outer, int):
raise TypeError("n_jobs_outer must be an integer.")
if n_jobs_outer < 1:
if n_jobs_outer != -1:
raise ValueError(
"n_jobs_outer must be greater than zero or equal to -1."
)
# n_jobs_inner
if not_none_condition:
if not isinstance(n_jobs_inner, int):
raise TypeError("n_jobs_inner must be an integer.")
if n_jobs_inner < 1:
if n_jobs_inner != -1:
raise ValueError(
"n_jobs_inner must be greater than zero or equal to -1."
)
def _check_hyperparam_grid(self, search_type, pipe_params):
if pipe_params != {}:
for hparam_key, hparam_values in pipe_params.items():
if not isinstance(hparam_key, str):
raise ValueError(
"The keys of the inner hyperparameters dictionaries must be "
"strings."
)
if search_type == "grid":
if not isinstance(hparam_values, list):
raise ValueError(
"The values of the inner hyperparameters dictionaries must be "
"lists if hparam_type is 'grid'."
)
if len(hparam_values) == 0:
raise ValueError(
"The values of the inner hyperparameters dictionaries cannot be "
"empty lists."
)
elif search_type == "prior":
# hparam_values must either be a list or a scipy.stats distribution
# create typeerror
if isinstance(hparam_values, list):
if len(hparam_values) == 0:
raise ValueError(
"The values of the inner hyperparameters dictionaries cannot "
"be empty lists."
)
else:
if not hasattr(hparam_values, "rvs"):
raise ValueError(
"Invalid random hyperparameter search dictionary element "
f"for hyperparameter {hparam_key}. The dictionary values "
"must be scipy.stats distributions."
)
def _checks_models_heatmap(
self,
name,
title=None,
cap=5,
figsize=(12, 8),
title_fontsize=None,
tick_fontsize=None,
):
"""
Checks for the models_heatmap method.
Parameters
----------
name : str
Name of the sequential optimization pipeline.
title : str, optional
Title of the heatmap. Default has the form "Model Selection Heatmap for {name}".
cap : int, optional
Maximum number of models to display. Default is 5, with limit of 10.
The chosen models are the 'cap' most frequently occurring in the process.
figsize : tuple, optional
Tuple of floats or ints denoting the figure size. Default is (12, 8).
title_fontsize : int, optional
Font size for the title. Default is None.
tick_fontsize : int, optional
Font size for the ticks. Default is None.
"""
if not isinstance(name, str):
raise TypeError("The pipeline name must be a string.")
if name not in self.chosen_models.name.unique():
raise ValueError(
f"""The pipeline name {name} is not in the list of already-calculated
pipelines. Please check the pipeline name carefully. If correct, please
run calculate_predictions() first.
"""
)
if not isinstance(cap, int):
raise TypeError("The cap must be an integer.")
if cap < 0:
raise ValueError("The cap must be greater than zero.")
if cap > 10:
raise ValueError("The cap must be 10 or lower.")
if title is None:
title = f"Model Selection Heatmap for {name}"
if not isinstance(title, str):
raise TypeError("The figure title must be a string.")
if not isinstance(figsize, tuple):
raise TypeError("The figsize argument must be a tuple.")
if len(figsize) != 2:
raise ValueError("The figsize argument must be a tuple of length 2.")
for element in figsize:
if not isinstance(element, (int, float)):
raise TypeError(
"The elements of the figsize tuple must be floats or ints."
)
if title_fontsize is not None:
if not isinstance(title_fontsize, int):
raise TypeError("The title_fontsize argument must be an integer.")
if title_fontsize < 0:
raise ValueError("The title_fontsize argument must be non-negative.")
if tick_fontsize is not None:
if not isinstance(tick_fontsize, int):
raise TypeError("The tick_fontsize argument must be an integer.")
if tick_fontsize < 0:
raise ValueError("The tick_fontsize argument must be non-negative.")
def _get_base_splits(self, inner_splitters):
"""Get the initial number of splits for each splitter."""
base_splits = {}
for splitter_name, splitter in inner_splitters.items():
if hasattr(splitter, "n_splits"):
base_splits[splitter_name] = splitter.n_splits
return base_splits
def _get_n_splits_add(self, iteration, outer_splitters, split_functions) -> dict:
"""Call split functions to determine the number of splits to add to each splitter."""
if split_functions is not None:
return {
splitter_name: (
int(np.ceil(split_function(iteration * outer_splitters.test_size)))
if split_function is not None
else 0
)
for splitter_name, split_function in split_functions.items()
}
else:
return None
def _remove_results(self, conditions):
"""
Remove rows from results DataFrames based on conditions.
Parameters
----------
conditions : list of tuples
List of tuples containing the attribute name, column name and value to filter on.
"""
for attr, column, value in conditions:
df = getattr(self, attr)
if value in df[column].unique():
setattr(self, attr, df[df[column] != value])