Source code for macrosynergy.learning.sequential.signal_optimizer

"""
Class to determine and store sequentially-optimized panel forecasts based on statistical
machine learning. 
"""

import numbers

import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.feature_selection import SelectorMixin
from sklearn.pipeline import Pipeline

from macrosynergy.learning import ExpandingIncrementPanelSplit
from macrosynergy.learning.sequential import BasePanelLearner
from macrosynergy.management.utils import concat_categorical, _insert_as_categorical
from macrosynergy.management.types import QuantamentalDataFrame


[docs]class SignalOptimizer(BasePanelLearner): """ Class for sequential optimization of return forecasts based on panels of quantamental features. Parameters ---------- df : pd.DataFrame Daily quantamental dataframe in JPMaQS format containing a panel of features, as well as a panel of returns. xcats : list List comprising feature names, with the last element being the response variable name. The features and the response variable must be categories in the dataframe. cids : list, optional List of cross-section identifiers for consideration in the panel. Default is None, in which case all cross-sections in `df` are considered. start : str, optional Start date for considered data in subsequent analysis in ISO 8601 format. Default is None i.e. the earliest date in the dataframe. end : str, optional End date for considered data in subsequent analysis in ISO 8601 format. Default is None i.e. the latest date in the dataframe. blacklist : list, optional Blacklisting dictionary specifying date ranges for which cross-sectional information should be excluded. The keys are cross-sections and the values are tuples of start and end dates in ISO 8601 format. Default is None. freq : str, optional Frequency of the analysis. Default is "M" for monthly. lag : int, optional Number of periods to lag the response variable. Default is 1. xcat_aggs : list, optional List of aggregation functions to apply to the features, used when `freq` is not `D`. Default is ["last", "sum"]. generate_labels : callable, optional Function to transform the response variable into either alternative regression targets or classification labels. Default is None. Notes ----- The `SignalOptimizer` class is used to predict the response variable, usually a panel of asset class returns, based on a panel of features that are lagged by a specified number of periods. This is done in a sequential manner, by specifying the size of an initial training set, choosing an optimal model out of a provided collection (with associated hyperparameters), forecasting the return panel, and then expanding the training set to include the now-realized returns. The process continues until the end of the dataset is reached. In addition to storing forecasts, this class also stores useful information for analysis such as the models selected at each point in time, the feature coefficients and intercepts (where relevant) of selected models, as well as the features selected by any feature selection modules. Model and hyperparameter selection is performed by cross-validation. Given a collection of models and associated hyperparameters to choose from, an HPO is run - currently only grid search and random search are supported - to determine the optimal choice. This is done by providing a collection of `scikit-learn` compatible scoring functions, as well as a collection of `scikit-learn` compatible cross-validation splitters and scorers. At each point in time, the cross-validation folds are the union of the folds produced by each splitter provided. Each scorer is evaluated on each test fold and summarised across test folds by either a custom function provided by the user or a common string i.e. 'mean'. Consequently, each model and hyperparameter combination has an associated collection of scores induced by different metrics, in units of those scorers. In order to form a composite score for each hyperparameter, the scores must be normalized across model/hyperparameter combinations. This makes scores across scorers comparable, so that the average score across adjusted scores can be used as a meaningful estimate of each model's generalization ability. Finally, a composite score for each model and hyperparameter combination is calculated by averaging the adjusted scores across all scorers. The optimal model is the one with the largest composite score. """ def __init__( self, df, xcats, cids=None, start=None, end=None, blacklist=None, freq="M", lag=1, xcat_aggs=["last", "sum"], generate_labels=None, ): # Run checks and necessary dataframe massaging super().__init__( df=df, xcats=xcats, cids=cids, start=start, end=end, blacklist=blacklist, freq=freq, lag=lag, xcat_aggs=xcat_aggs, generate_labels=generate_labels, ) # Create forecast dataframe index min_date = min(self.unique_date_levels) max_date = max(self.unique_date_levels) forecast_date_levels = pd.date_range(start=min_date, end=max_date, freq="B") self.forecast_idxs = pd.MultiIndex.from_product( [self.unique_xs_levels, forecast_date_levels], names=["cid", "real_date"] ) # Create initial dataframes to store relevant quantities from the learning process self.preds = pd.DataFrame(columns=["real_date", "cid", "xcat", "value"]).astype( { "real_date": "datetime64[ns]", "cid": "category", "xcat": "category", "value": "float32", } ) self.feature_importances = pd.DataFrame( columns=["real_date", "name"] + list(self.X.columns) ).astype( { **{col: "float32" for col in self.X.columns}, "real_date": "datetime64[ns]", "name": "category", } ) self.intercepts = pd.DataFrame( columns=["real_date", "name", "intercepts"] ).astype( { "real_date": "datetime64[ns]", "name": "category", "intercepts": "float32", } ) self.selected_ftrs = pd.DataFrame( columns=["real_date", "name"] + list(self.X.columns) ).astype( { **{col: "int" for col in self.X.columns}, "real_date": "datetime64[ns]", "name": "category", } ) self.store_correlations = False # Create data structure to store correlation matrix of features feeding into the # final model and the input features themselves self.ftr_corr = pd.DataFrame( columns=[ "real_date", "name", "predictor_input", "pipeline_input", "pearson", ] ).astype( { "real_date": "datetime64[ns]", "name": "category", "predictor_input": "category", "pipeline_input": "category", "pearson": "float", } )
[docs] def calculate_predictions( self, name, models, hyperparameters = None, scorers = None, inner_splitters = None, search_type="grid", normalize_fold_results=False, cv_summary="mean", include_train_folds=False, min_cids=4, min_periods=12 * 3, test_size=1, max_periods=None, split_functions=None, n_iter=None, n_jobs_outer=-1, n_jobs_inner=1, store_correlations=False, ): """ Determine forecasts and store relevant quantities over time. Parameters ---------- name : str Name of the signal optimization process. models : dict Dictionary of models to choose from. The keys are model names and the values are scikit-learn compatible models. hyperparameters : dict, optional Dictionary of hyperparameters to choose from. The keys are model names and the values are hyperparameter dictionaries for the corresponding model. The keys must match with those provided in `models`. If no hyperparameters are required to be tuned, this parameter can be None. Default is None. scorers : dict, optional Dictionary of scoring functions to use in cross-validation if hyperparameters or models are needed to be selected. The keys are scorer names and the values are scikit-learn compatible scoring functions. If no cross-validation is required, this parameter can be None. Default is None. inner_splitters : dict, optional Dictionary of inner splitters to use in cross-validation. The keys are splitter names and the values are scikit-learn compatible cross-validator objects. If no cross-validation is required, this parameter can be None. Default is None. search_type : str, optional Type of hyperparameter optimization to perform. Default is "grid". Options are "grid" and "prior". If no hyperparameter tuning is required, this parameter can be disregarded. normalize_fold_results : bool, optional Whether to normalize the scores across folds before combining them. Default is False. If no hyperparameter tuning is required, this parameter can be disregarded. cv_summary : str or callable, optional Summary function to use to combine scores across cross-validation folds. Default is "mean". Options are "mean", "median", "mean-std", "mean/std", "mean-std-ge" or a callable function. If no hyperparameter tuning is required, this parameter can be disregarded. include_train_folds : bool, optional Whether to calculate cross-validation statistics on the training folds in additional to the test folds. If True, the cross-validation estimator will be a function of both training data and test data. It is recommended to set `cv_summary` appropriately. Default is False. If no hyperparameter tuning is required, this parameter can be disregarded. min_cids : int, optional Minimum number of cross-sections required for the initial training set. Default is 4. min_periods : int, optional Minimum number of periods required for the initial training set, in units of the frequency `freq` specified in the constructor. Default is 36. test_size : int, optional Number of periods to pass before retraining a selected model. Default is 1. max_periods : int, optional Maximum length of each training set in units of the frequency `freq` specified in the constructor. Default is None, in which case the sequential optimization uses expanding training sets, as opposed to rolling windows. split_functions : dict, optional Dict of callables for determining the number of cross-validation splits to add to the initial number as a function of the number of iterations passed in the sequential learning process. The keys must correspond to the keys in `inner_splitters` and should be set to None for any splitters that do not require splitter adjustment. Default is None. If no hyperparameter tuning is required, this parameter can be disregarded. n_iter : int, optional Number of iterations to run in random hyperparameter search. Default is None. If no hyperparameter tuning is required, this parameter can be disregarded. n_jobs_outer : int, optional Number of jobs to run in parallel for the outer sequential loop. Default is -1. It is advised for n_jobs_inner * n_jobs_outer (replacing -1 with the number of available cores) to be less than or equal to the number of available cores on the machine. n_jobs_inner : int, optional Number of jobs to run in parallel for the inner loop. Default is 1. It is advised for n_jobs_inner * n_jobs_outer (replacing -1 with the number of available cores) to be less than or equal to the number of available cores on the machine. If no hyperparameter tuning is required, this parameter can be disregarded. store_correlations : bool Whether to store the correlations between input pipeline features and input predictor features. Default is False. """ if not isinstance(store_correlations, bool): raise TypeError("The store_correlations argument must be a boolean.") if store_correlations and not all( [isinstance(model, Pipeline) for model in models.values()] ): raise ValueError( "The store_correlations argument is only valid when all models are Scikit-learn Pipelines." ) self.store_correlations = store_correlations # Set up outer splitter outer_splitter = ExpandingIncrementPanelSplit( train_intervals=test_size, test_size=test_size, min_cids=min_cids, min_periods=min_periods, max_periods=max_periods, ) results = self.run( name=name, outer_splitter=outer_splitter, inner_splitters=inner_splitters, models=models, hyperparameters=hyperparameters, scorers=scorers, search_type=search_type, normalize_fold_results=normalize_fold_results, cv_summary=cv_summary, include_train_folds=include_train_folds, split_functions=split_functions, n_iter=n_iter, n_jobs_outer=n_jobs_outer, n_jobs_inner=n_jobs_inner, ) self._check_duplicate_results(name) # Collect results from the worker # quantamental_data, model_data, other_data prediction_data = [] model_choice_data = [] ftr_coef_data = [] intercept_data = [] ftr_selection_data = [] ftr_corr_data = [] for split_result in results: prediction_data.append(split_result["predictions"]) model_choice_data.append(split_result["model_choice"]) ftr_coef_data.append(split_result["feature_importances"]) intercept_data.append(split_result["intercepts"]) ftr_selection_data.append(split_result["selected_ftrs"]) ftr_corr_data.extend(split_result["ftr_corr"]) # First create pandas dataframes to store the forecasts forecasts_df = pd.DataFrame( index=self.forecast_idxs, columns=[name], data=np.nan, dtype="float32" ) # Create quantamental dataframe of forecasts for idx, forecasts in prediction_data: forecasts_df.loc[idx, name] = forecasts forecasts_df = forecasts_df.groupby(level=0).ffill().dropna() if self.blacklist is not None: for cross_section, periods in self.blacklist.items(): cross_section_key = cross_section.split("_")[0] if cross_section_key in self.unique_xs_levels: forecasts_df.loc[ (cross_section_key, slice(periods[0], periods[1])), : ] = np.nan forecasts_df.columns = forecasts_df.columns.astype("category") forecasts_df_long = pd.melt( frame=forecasts_df.reset_index(), id_vars=["real_date", "cid"], var_name="xcat", ) self.preds = concat_categorical( df1=self.preds, df2=forecasts_df_long, ) # Store model selection data model_df_long = pd.DataFrame( columns=[col for col in self.chosen_models.columns if col != "name"], data=model_choice_data, ).astype({"model_type": "category"}) model_df_long = _insert_as_categorical(model_df_long, "name", name, 1) self.chosen_models = concat_categorical( df1=self.chosen_models, df2=model_df_long, ) # Store feature coefficients coef_df_long = pd.DataFrame( columns=[col for col in self.feature_importances.columns if col != "name"], data=ftr_coef_data, ) coef_df_long = _insert_as_categorical(coef_df_long, "name", name, 1) self.feature_importances = concat_categorical( self.feature_importances, coef_df_long, ) # Store intercept intercept_df_long = pd.DataFrame( columns=[col for col in self.intercepts.columns if col != "name"], data=intercept_data, ) intercept_df_long = _insert_as_categorical(intercept_df_long, "name", name, 1) self.intercepts = concat_categorical( self.intercepts, intercept_df_long, ) # Store selected features ftr_select_df_long = pd.DataFrame( columns=[col for col in self.selected_ftrs.columns if col != "name"], data=ftr_selection_data, ) ftr_select_df_long = _insert_as_categorical(ftr_select_df_long, "name", name, 1) self.selected_ftrs = concat_categorical( self.selected_ftrs, ftr_select_df_long, ) ftr_corr_df_long = pd.DataFrame( columns=self.ftr_corr.columns, data=ftr_corr_data ) self.ftr_corr = concat_categorical( self.ftr_corr, ftr_corr_df_long, )
def _check_duplicate_results(self, name): conditions = [ ("preds", "xcat", name), ("feature_importances", "name", name), ("intercepts", "name", name), ("selected_ftrs", "name", name), ("ftr_corr", "name", name), ("chosen_models", "name", name), ] self._remove_results(conditions)
[docs] def store_split_data( self, pipeline_name, optimal_model, optimal_model_name, optimal_model_score, optimal_model_params, inner_splitters_adj, X_train, y_train, X_test, y_test, timestamp, adjusted_test_index, ): """ Stores characteristics of the optimal model at each retraining date. Parameters ---------- pipeline_name : str Name of the signal optimization process. optimal_model : RegressorMixin, ClassifierMixin or Pipeline Optimal model selected at each retraining date. optimal_model_name : str Name of the optimal model. optimal_model_score : float Cross-validation score for the optimal model. optimal_model_params : dict Chosen hyperparameters for the optimal model. inner_splitters_adj : dict Dictionary of adjusted inner splitters. X_train : pd.DataFrame Training feature matrix. y_train : pd.Series Training response variable. X_test : pd.DataFrame Test feature matrix. y_test : pd.Series Test response variable. timestamp : pd.Timestamp Timestamp of the retraining date. adjusted_test_index : pd.MultiIndex Adjusted test index to account for lagged features. Returns ------- dict Dictionary containing feature importance scores, intercepts, selected features and correlations between inputs to pipelines and those entered into a final model. """ if optimal_model is not None: if hasattr(optimal_model, "create_signal"): if callable(getattr(optimal_model, "create_signal")): preds = optimal_model.create_signal(X_test) else: preds = optimal_model.predict(X_test) else: preds = np.zeros(X_test.shape[0]) prediction_data = [adjusted_test_index, preds] feature_names = np.array(X_train.columns) if isinstance(optimal_model, Pipeline): final_estimator = optimal_model[-1] for _, transformer in reversed(optimal_model.steps): if isinstance(transformer, SelectorMixin): feature_names = transformer.get_feature_names_out() break else: final_estimator = optimal_model coefs = np.full(X_train.shape[1], np.nan) if hasattr(final_estimator, "feature_importances_") or ( hasattr(final_estimator, "coef_") ): if hasattr(final_estimator, "feature_importances_"): coef = final_estimator.feature_importances_ elif hasattr(final_estimator, "coef_"): coef = final_estimator.coef_ # Reshape coefficients for storage compatibility if coef.ndim == 1: coefs = coef elif coef.ndim == 2: if coef.shape[0] == 1: coefs = coef.flatten() coef_ftr_map = {ftr: coef for ftr, coef in zip(feature_names, coefs)} coefs = [ coef_ftr_map[ftr] if ftr in coef_ftr_map else np.nan for ftr in X_train.columns ] if hasattr(final_estimator, "intercept_"): if isinstance(final_estimator.intercept_, np.ndarray): # Store the intercept if it has length one if len(final_estimator.intercept_) == 1: intercepts = final_estimator.intercept_[0] else: intercepts = np.nan else: # The intercept will be a float/integer intercepts = final_estimator.intercept_ else: intercepts = np.nan # Get feature selection information if len(feature_names) == X_train.shape[1]: # Then all features were selected ftr_selection_data = [timestamp] + [1 for _ in feature_names] else: # Then some features were excluded ftr_selection_data = [timestamp] + [ 1 if name in feature_names else 0 for name in np.array(X_train.columns) ] ftr_corr_data = self._get_ftr_corr_data( pipeline_name, optimal_model, X_train, timestamp ) # Store data split_result = { "feature_importances": [timestamp] + coefs, "intercepts": [timestamp, intercepts], "selected_ftrs": ftr_selection_data, "predictions": prediction_data, "ftr_corr": ftr_corr_data, } return split_result
def _get_ftr_corr_data(self, pipeline_name, optimal_model, X_train, timestamp): """ Returns a list of correlations between the input features to a pipeline and the features inputted into the final model, at each retraining date. Parameters ---------- pipeline_name : str Name of the signal optimization process. optimal_model : RegressorMixin, ClassifierMixin or Pipeline Optimal model selected at each retraining date. X_train : pd.DataFrame Input feature matrix. timestamp : pd.Timestamp Timestamp of the retraining date. Returns ------- list List of correlations between the input features to a pipeline and the features inputted into the final model, at each retraining date. """ if self.store_correlations and optimal_model is not None: # Transform the training data to the final feature space transformers = Pipeline(steps=optimal_model.steps[:-1]) X_train_transformed = transformers.transform(X_train) n_features = X_train_transformed.shape[1] feature_names = ( X_train_transformed.columns if isinstance(X_train_transformed, pd.DataFrame) else [f"Feature {i+1}" for i in range(n_features)] ) # Calculate correlation between each original feature in X_train and # the transformed features in X_train_transformed if isinstance(X_train_transformed, pd.DataFrame): X_train_transformed = X_train_transformed.values ftr_corr_data = [ [ timestamp, pipeline_name, final_feature_name, input_feature_name, np.corrcoef( X_train_transformed[:, idx], X_train[input_feature_name], )[0, 1], ] for idx, final_feature_name in enumerate(feature_names) for input_feature_name in X_train.columns ] elif self.store_correlations and optimal_model is None: ftr_corr_data = [ [ timestamp, pipeline_name, feature_name, feature_name, 1, ] for feature_name in X_train.columns ] else: ftr_corr_data = [] return ftr_corr_data
[docs] def get_optimized_signals(self, name=None): """ Returns optimized signals for one or more processes Parameters ---------- name : str or list, optional Label(s) of signal optimization process(es). Default is all stored in the class instance. Returns ------- pd.DataFrame Pandas dataframe in JPMaQS format of working daily predictions. """ if name is None: preds = self.preds else: if isinstance(name, str): name = [name] elif not isinstance(name, list): raise TypeError( "The process name must be a string or a list of strings." ) for n in name: if n not in self.preds.xcat.unique(): raise ValueError( f"""The process name '{n}' is not in the list of already-run pipelines. Please check the name carefully. If correct, please run calculate_predictions() first. """ ) preds = self.preds[self.preds.xcat.isin(name)] # return self.preds[self.preds.xcat.isin(name)] signals_df = QuantamentalDataFrame( df=preds, categorical=self.df.InitializedAsCategorical, ).to_original_dtypes() return signals_df
[docs] def get_selected_features(self, name=None): """ Returns the selected features over time for one or more processes. Parameters ---------- name: str or list, optional Label(s) of signal optimization process(es). Default is all stored in the class instance. Returns ------- pd.DataFrame Pandas dataframe of the selected features at each retraining date. """ if name is None: return self.selected_ftrs else: if isinstance(name, str): name = [name] elif not isinstance(name, list): raise TypeError( "The process name must be a string or a list of strings." ) for n in name: if n not in self.selected_ftrs.name.unique(): raise ValueError( f"""The process name '{n}' is not in the list of already-run pipelines. Please check the name carefully. If correct, please run calculate_predictions() first. """ ) return self.selected_ftrs[self.selected_ftrs.name.isin(name)]
[docs] def get_feature_importances(self, name=None): """ Returns feature importances for a given pipeline. Parameters ---------- name: str or list, optional Label(s) of signal optimization process(es). Default is all stored in the class instance. Returns ------- pd.DataFrame Pandas dataframe of the feature importances, if available, learnt at each retraining date for a given pipeline. Notes ----- Availability of feature importances is subject to the selected model having a `feature_importances_` or `coef_` attribute. """ if name is None: return self.feature_importances else: if isinstance(name, str): name = [name] elif not isinstance(name, list): raise TypeError( "The process name must be a string or a list of strings." ) for n in name: if n not in self.feature_importances.name.unique(): raise ValueError( f"""The process name '{n}' is not in the list of already-run pipelines. Please check the name carefully. If correct, please run calculate_predictions() first. """ ) return self.feature_importances[ self.feature_importances.name.isin(name) ].sort_values(by="real_date")
[docs] def get_intercepts(self, name=None): """ Returns intercepts for a given pipeline. Parameters ---------- name: str or list, optional Label(s) of signal optimization process(es). Default is all stored in the class instance. Returns ------- pd.DataFrame Pandas dataframe of the intercepts, if available, learnt at each retraining date for a given pipeline. """ if name is None: return self.intercepts else: if isinstance(name, str): name = [name] elif not isinstance(name, list): raise TypeError( "The process name must be a string or a list of strings." ) for n in name: if n not in self.intercepts.name.unique(): raise ValueError( f"""The process name '{n}' is not in the list of already-run pipelines. Please check the name carefully. If correct, please run calculate_predictions() first. """ ) return self.intercepts[self.intercepts.name.isin(name)].sort_values( by="real_date" )
[docs] def get_feature_correlations( self, name=None, ): """ Returns dataframe of feature correlations for one or more processes Parameters ---------- name: str or list, optional Label(s) of signal optimization process(es). Default is all stored in the class instance. Returns ------- pd.DataFrame Pandas dataframe of the correlations between the features passed into a model pipeline and the post-processed features inputted into the final model. """ if name is None: return self.ftr_corr else: if isinstance(name, str): name = [name] elif not isinstance(name, list): raise TypeError( "The process name must be a string or a list of strings." ) for n in name: if n not in self.ftr_corr.name.unique(): raise ValueError( f"""Either the process name '{n}' is not in the list of already-run pipelines, or no correlations were stored for this pipeline. Please check the name carefully. If correct, please run calculate_predictions() first. """ ) return self.ftr_corr[self.ftr_corr.name.isin(name)]
[docs] def feature_selection_heatmap( self, name, remove_blanks=True, title=None, cap=None, ftrs_renamed=None, figsize=(12, 8), tick_fontsize=None, ): """ Visualise the features chosen by the final selector in a scikit-learn pipeline over time, for a given signal optimization process that has been run. Parameters ---------- name : str Name of the previously run signal optimization process. remove_blanks : bool, optional Whether to remove features from the heatmap that were never selected. Default is True. title : str, optional Title of the heatmap. Default is None. This creates a figure title of the form "Model Selection Heatmap for {name}". cap : int, optional Maximum number of features to display. Default is None. The chosen features are the 'cap' most frequently occurring in the pipeline. ftrs_renamed : dict, optional Dictionary to rename the feature names for visualisation in the plot axis. Default is None, which uses the original feature names. figsize : tuple of floats or ints, optional Tuple of floats or ints denoting the figure size. Default is (12, 8). tick_fontsize : int, optional Font size of the ticks on the heatmap. Default is None. Notes ----- This method displays the features selected by the final selector in a scikit-learn pipeline over time, for a given signal optimization process that has been run. This information is contained within a binary heatmap. This does not take into account inherent feature selection within the predictor. """ # Checks self._checks_feature_selection_heatmap( name=name, title=title, ftrs_renamed=ftrs_renamed, figsize=figsize, tick_fontsize=tick_fontsize, ) # Get the selected features for the specified pipeline to visualise selection. selected_ftrs = self.get_selected_features(name=name) selected_ftrs["real_date"] = selected_ftrs["real_date"].dt.date selected_ftrs = ( selected_ftrs.sort_values(by="real_date") .drop(columns=["name"]) .set_index("real_date") ) # Sort dataframe columns in descending order of the number of times they were selected ftr_count = selected_ftrs.sum().sort_values(ascending=False) if remove_blanks: ftr_count = ftr_count[ftr_count > 0] if cap is not None: ftr_count = ftr_count.head(cap) reindexed_columns = ftr_count.index selected_ftrs = selected_ftrs[reindexed_columns] if ftrs_renamed is not None: selected_ftrs.rename(columns=ftrs_renamed, inplace=True) # Create the heatmap plt.figure(figsize=figsize) if np.all(selected_ftrs == 1): sns.heatmap(selected_ftrs.T, cmap="binary_r", cbar=False) else: sns.heatmap(selected_ftrs.T, cmap="binary", cbar=False) plt.title(title) plt.xticks(fontsize=tick_fontsize) # X-axis tick font size plt.yticks(fontsize=tick_fontsize) plt.show()
def _checks_feature_selection_heatmap( self, name: str, title=None, ftrs_renamed=None, figsize=(12, 8), tick_fontsize=None, ): """ Checks for the feature_selection_heatmap method. Parameters ---------- name : str Name of the previously run signal optimization process. title : str, optional Title of the heatmap. Default is None. This creates a figure title of the form "Feature Selection Heatmap for {name}". ftrs_renamed : dict, optional Dictionary to rename the feature names for visualisation in the plot axis. Default is None, which uses the original feature names. figsize : tuple of floats or ints, optional Tuple of floats or ints denoting the figure size. Default is (12, 8). tick_fontsize : int, optional Font size of the ticks on the heatmap. Default is None. """ if not isinstance(name, str): raise TypeError("The pipeline name must be a string.") if name not in self.selected_ftrs.name.unique(): raise ValueError( f"""The pipeline name {name} is not in the list of already-calculated pipelines. Please check the pipeline name carefully. If correct, please run calculate_predictions() first. """ ) if title is None: title = f"Feature Selection Heatmap for {name}" if not isinstance(title, str): raise TypeError("The figure title must be a string.") if not isinstance(figsize, tuple): raise TypeError("The figsize argument must be a tuple.") if len(figsize) != 2: raise ValueError("The figsize argument must be a tuple of length 2.") for element in figsize: if not isinstance(element, (int, float)): raise TypeError( "The elements of the figsize tuple must be floats or ints." ) if ftrs_renamed is not None: if not isinstance(ftrs_renamed, dict): raise TypeError("The ftrs_renamed argument must be a dictionary.") for key, value in ftrs_renamed.items(): if not isinstance(key, str): raise TypeError( "The keys of the ftrs_renamed dictionary must be strings." ) if not isinstance(value, str): raise TypeError( "The values of the ftrs_renamed dictionary must be strings." ) if key not in self.X.columns: raise ValueError( f"""The key {key} in the ftrs_renamed dictionary is not a feature in the pipeline {name}. """ ) if tick_fontsize is not None: if not isinstance(tick_fontsize, int): raise TypeError("The tick_fontsize argument must be an integer.")
[docs] def correlations_heatmap( self, name: str, feature_name: str, title: str = None, cap: int = None, ftrs_renamed: dict = None, figsize: tuple = (12, 8), ): """ Method to visualise correlations between features entering a model, and those that entered a preprocessing pipeline. Parameters ---------- name : str Name of the signal optimization process. feature_name : str Name of the feature passed into the final predictor. title : str, optional Title of the heatmap. Default is None. This creates a figure title of the form "Correlation Heatmap for feature {feature_name} and pipeline {name}". cap : int, optional Maximum number of correlations to display. Default is None. The chosen features are the 'cap' most highly correlated. ftrs_renamed : dict, optional Dictionary to rename the feature names for visualisation in the plot axis. Default is None, which uses the original feature names. figsize : tuple of floats or ints, optional Tuple of floats or ints denoting the figure size. Default is (12, 8). Notes ----- This method displays the correlation between a feature that is about to be entered into a final predictor and the `cap` most correlated features entered into the original pipeline. This information is contained within a heatmap. In a given pipeline, the features that enter it can be transformed in any way. Sometimes the transformation is non-trivial, resulting in a feature space that is not easily interpretable. This method allows the user to see how the original features are correlated with the features that enter the final model, providing insight into the transformation process. As an example, dimensionality reduction techniques such as PCA and LDA rotate the feature space, resulting in factors that can be hard to interpret. A neural network aims to learn a non-linear transformation of the feature space, which can also be hard to interpret. This method allows the user to see how the original features are correlated with the transformed features, providing insight into the transformation that took place. """ # Checks self._checks_correlations_heatmap( name=name, feature_name=feature_name, title=title, cap=cap, ftrs_renamed=ftrs_renamed, figsize=figsize, ) # Get the correlations correlations = self.get_feature_correlations(name=name) correlations = correlations[correlations.predictor_input == feature_name] correlations = correlations.sort_values(by="real_date").drop(columns=["name"]) correlations["real_date"] = correlations["real_date"].dt.date # Sort this dataframe based on the average correlation with each feature in # pipeline_input avg_corr = correlations.groupby("pipeline_input", observed=True)[ "pearson" ].mean() avg_corr = avg_corr.sort_values(ascending=False) if cap is not None: avg_corr = avg_corr.head(cap) reindexed_columns = avg_corr.index correlations = correlations[correlations.pipeline_input.isin(reindexed_columns)] if ftrs_renamed is not None: # rename items in correlations.pipeline_input based on ftrs_renamed # but leave items not in ftrs_renamed as they are correlations["pipeline_input"] = correlations["pipeline_input"].map( lambda x: ftrs_renamed.get(x, x) ) # Create the heatmap plt.figure(figsize=figsize) sns.heatmap( correlations.pivot( index="pipeline_input", columns="real_date", values="pearson" ), cmap="coolwarm_r", cbar=True, ) if title is None: title = ( f"Correlation Heatmap for feature {feature_name} and pipeline {name}" ) plt.title(title) plt.show()
def _checks_correlations_heatmap( self, name: str, feature_name: str, title: str, cap: int, ftrs_renamed: dict, figsize: tuple, ): """ Checks for the correlations_heatmap method. Parameters ---------- name : str Name of the signal optimization process. feature_name : str Name of the feature passed into the final predictor. title : str Title of the heatmap. Default is None. This creates a figure title of the form "Correlation Heatmap for feature {feature_name} and pipeline {name}". cap : int Maximum number of correlations to display. Default is None. The chosen features are the 'cap' most highly correlated. ftrs_renamed : dict Dictionary to rename the feature names for visualisation in the plot axis. Default is None, which uses the original feature names. figsize : tuple of floats or ints Tuple of floats or ints denoting the figure size. Default is (12, 8). """ # name if not isinstance(name, str): raise TypeError("The pipeline name must be a string.") if name not in self.ftr_corr.name.unique(): raise ValueError( f"""The pipeline name {name} is not in the list of pipelines with calculated correlation matrices. Please check the pipeline name carefully. If correct, please run calculate_predictions() first, or make sure `store_correlations` is turned on. """ ) # feature name if not isinstance(feature_name, str): raise TypeError("The feature name must be a string.") if feature_name not in self.ftr_corr.predictor_input.unique(): raise ValueError( f"""The feature name {feature_name} is not in the list of features that were passed into the final predictor. Please check the feature name carefully. """ ) # title if title is not None: if not isinstance(title, str): raise TypeError("The title must be a string.") # cap if cap is not None: if not isinstance(cap, int): raise TypeError("The cap must be an integer.") if cap <= 0: raise ValueError("The cap must be greater than zero.") # ftrs_renamed if ftrs_renamed is not None: if not isinstance(ftrs_renamed, dict): raise TypeError("The ftrs_renamed argument must be a dictionary.") for key, value in ftrs_renamed.items(): if not isinstance(key, str): raise TypeError( "The keys of the ftrs_renamed dictionary must be strings." ) if not isinstance(value, str): raise TypeError( "The values of the ftrs_renamed dictionary must be strings." ) if key not in self.X.columns: raise ValueError( f"""The key {key} in the ftrs_renamed dictionary is not a feature in the pipeline {name}. """ ) # figsize if not isinstance(figsize, tuple): raise TypeError("The figsize argument must be a tuple.") if len(figsize) != 2: raise ValueError("The figsize argument must be a tuple of length 2.") for element in figsize: if not isinstance(element, numbers.Number) or isinstance(element, bool): raise TypeError( "The elements of the figsize tuple must be floats or ints." )
[docs] def feature_importance_timeplot( self, name, ftrs=None, title=None, ftrs_renamed=None, figsize=(10, 6), title_fontsize=None, label_fontsize=None, tick_fontsize=None, ): """ Visualise time series of feature importances for the final predictor in a given pipeline, when available. Parameters ---------- name : str Name of the previously run signal optimization process. ftrs : list, optional List of feature names to plot. Default is None. title : str, optional Title of the plot. Default is None. This creates a figure title of the form "Feature importances for pipeline: {name}". ftrs_renamed : dict, optional Dictionary to rename the feature names for visualisation in the plot legend. Default is None, which uses the original feature names. figsize : tuple of floats or ints, optional Tuple of floats or ints denoting the figure size. Default is (10, 6). title_fontsize : int, optional Font size for the title. Default is None. label_fontsize : int, optional Font size for the axis labels. Default is None. tick_fontsize : int, optional Font size for the axis ticks. Default is None. Notes ----- This method displays the time series of feature importances for a given pipeline, when available. Availability depends on whether or not the final predictor in the pipeline has either a `coefs_` or `feature_importances_` attribute. This information is contained within a line plot. The default behaviour is to sort the feature importance columns in ascending order of the number of NAs, accounting for a possible feature selection module in the pipeline and plot the feature importances for the first 10 features in the sorted order. If more than 10 features were involved in the learning procedure, the default is to plot the feature importances for the first 10 sorted features. By specifying a `ftrs` list (which can be no longer than 10 elements in length), this default behaviour can be overridden. By sorting by NAs, the plot displays the model feature importances for either the first 10 features in the dataframe or, when a feature selection module was present, the 10 most frequently selected features. """ # Checks if not isinstance(name, str): raise TypeError("The pipeline name must be a string.") if name not in self.feature_importances.name.unique(): raise ValueError( f"""The pipeline name {name} is not in the list of already-calculated pipelines. Please check the pipeline name carefully. If correct, please run calculate_predictions() first. """ ) ftrcoef_df = self.get_feature_importances(name) if ftrcoef_df.iloc[:, 2:].isna().all().all(): raise ValueError( f"""There are no non-NA feature importances for the pipeline {name}. Cannot display a time series plot. """ ) if ftrs is not None: if not isinstance(ftrs, list): raise TypeError("The ftrs argument must be a list.") if len(ftrs) > 10: raise ValueError( "The ftrs list must be no longer than 10 elements in length." ) for ftr in ftrs: if not isinstance(ftr, str): raise TypeError("The elements of the ftrs list must be strings.") if ftr not in ftrcoef_df.columns: raise ValueError( f"""The feature {ftr} is not in the list of feature coefficients for the pipeline {name}. """ ) if not isinstance(title, str) and title is not None: raise TypeError("The title must be a string.") if ftrs_renamed is not None: if not isinstance(ftrs_renamed, dict): raise TypeError("The ftrs_renamed argument must be a dictionary.") for key, value in ftrs_renamed.items(): if not isinstance(key, str): raise TypeError( "The keys of the ftrs_renamed dictionary must be strings." ) if not isinstance(value, str): raise TypeError( "The values of the ftrs_renamed dictionary must be strings." ) if key not in self.X.columns: raise ValueError( f"""The key {key} in the ftrs_renamed dictionary is not a feature in the pipeline {name}. """ ) if not isinstance(figsize, tuple): raise TypeError("The figsize argument must be a tuple.") if len(figsize) != 2: raise ValueError("The figsize argument must be a tuple of length 2.") for element in figsize: if not isinstance(element, numbers.Real): raise TypeError( "The elements of the figsize tuple must be floats or ints." ) if title_fontsize is not None: if not isinstance(title_fontsize, int): raise TypeError("The title_fontsize argument must be an integer.") if label_fontsize is not None: if not isinstance(label_fontsize, int): raise TypeError("The label_fontsize argument must be an integer.") if tick_fontsize is not None: if not isinstance(tick_fontsize, int): raise TypeError("The tick_fontsize argument must be an integer.") # Set the style sns.set_style("darkgrid") # Reshape dataframe for plotting ftrcoef_df = self.get_feature_importances(name) ftrcoef_df = ftrcoef_df.set_index("real_date") ftrcoef_df = ftrcoef_df.iloc[:, 1:] # Sort dataframe columns in ascending order of the number of Na values in the columns na_count = ftrcoef_df.isna().sum().sort_values() reindexed_columns = na_count.index ftrcoef_df = ftrcoef_df[reindexed_columns] if ftrs is not None: ftrcoef_df = ftrcoef_df[ftrs] else: if ftrcoef_df.shape[1] > 11: ftrcoef_df = pd.concat( (ftrcoef_df.iloc[:, :10], ftrcoef_df.iloc[:, -1]), axis=1 ) # Create time series plot fig, ax = plt.subplots() if ftrs_renamed is not None: ftrcoef_df.rename(columns=ftrs_renamed).plot(ax=ax, figsize=figsize) else: ftrcoef_df.plot(ax=ax, figsize=figsize) if title is not None: plt.title(title, fontsize=title_fontsize) else: plt.title( f"Feature importances for pipeline: {name}", fontsize=title_fontsize ) ax.set_xlabel(ax.get_xlabel(), fontsize=label_fontsize) ax.set_ylabel(ax.get_ylabel(), fontsize=label_fontsize) ax.tick_params(axis="x", labelsize=tick_fontsize) ax.tick_params(axis="y", labelsize=tick_fontsize) plt.show()
[docs] def intercepts_timeplot(self, name, title=None, figsize=(10, 6)): """ Visualise time series of intercepts for a given pipeline, when available. Parameters ---------- name : str Name of the previously run signal optimization process. title : str, optional Title of the plot. Default is None. This creates a figure title of the form "Intercepts for pipeline: {name}". figsize : tuple of floats or ints, optional Tuple of floats or ints denoting the figure size. Default is (10, 6). Notes ----- This method displays the time series of intercepts for a given pipeline, when available. This information is contained within a line plot. """ # Checks if not isinstance(name, str): raise TypeError("The pipeline name must be a string.") if name not in self.intercepts.name.unique(): raise ValueError( f"""The pipeline name {name} is not in the list of already-calculated pipelines. Please check the pipeline name carefully. If correct, please run calculate_predictions() first. """ ) intercepts_df = self.get_intercepts(name) # TODO: the next line will be made redundament once the signal optimiser checks for this # and removes any pipelines with all NaN intercepts if intercepts_df.iloc[:, 2:].isna().all().all(): raise ValueError( f"""There are no non-NA intercepts for the pipeline {name}. Cannot display a time series plot. """ ) if not isinstance(title, str) and title is not None: raise TypeError("The title must be a string.") if not isinstance(figsize, tuple): raise TypeError("The figsize argument must be a tuple.") if len(figsize) != 2: raise ValueError("The figsize argument must be a tuple of length 2.") for element in figsize: if not isinstance(element, (int, float)): raise TypeError( "The elements of the figsize tuple must be floats or ints." ) # Set the style sns.set_style("darkgrid") # Reshape dataframe for plotting intercepts_df = intercepts_df.set_index("real_date") intercepts_df = intercepts_df.iloc[:, 1] # Create time series plot fig, ax = plt.subplots() intercepts_df.plot(ax=ax, figsize=figsize) if title is not None: plt.title(title) else: plt.title(f"Intercepts for pipeline: {name}") plt.show()
[docs] def coefs_stackedbarplot( self, name, ftrs=None, title=None, cap=None, ftrs_renamed=None, figsize=(10, 6), title_fontsize=None, label_fontsize=None, tick_fontsize=None, ): """ Visualise feature coefficients for a given pipeline in a stacked bar plot. Parameters ---------- name : str Name of the previously run signal optimization process. ftrs : list, optional List of feature names to plot. Default is None. title : str, optional Title of the plot. Default is None. This creates a figure title of the form "Stacked bar plot of model coefficients: {name}". cap : int, optional Maximum number of features to display. Default is None. The chosen features are the 'cap' most frequently occurring in the pipeline. This cannot exceed 10. ftrs_renamed : dict, optional Dictionary to rename the feature names for visualisation in the plot legend. Default is None, which uses the original feature names. figsize : tuple of floats or ints, optional Tuple of floats or ints denoting the figure size. Default is (10, 6). title_fontsize : int, optional Font size for the title. Default is None. label_fontsize : int, optional Font size for the axis labels. Default is None. tick_fontsize : int, optional Font size for the axis ticks. Default is None. Notes ----- This method displays the average feature coefficients for a given pipeline in each calendar year, when available. This information is contained within a stacked bar plot. The default behaviour is to plot the first 10 features in the order specified during training. If more than 10 features were involved in the learning procedure, the default is to plot the first 10 features. By specifying a `ftrs` list (which can be no longer than 10 elements in length), this default behaviour can be overridden. """ # Checks if not isinstance(name, str): raise TypeError("The pipeline name must be a string.") if name not in self.feature_importances.name.unique(): raise ValueError( f"""The pipeline name {name} is not in the list of already-calculated pipelines. Please check the pipeline name carefully. If correct, please run calculate_predictions() first. """ ) ftrcoef_df = self.get_feature_importances(name) if ftrcoef_df.iloc[:, 2:].isna().all().all(): raise ValueError( f"""There are no non-NA coefficients for the pipeline {name}. Cannot display a stacked bar plot. """ ) if ftrs is not None: if not isinstance(ftrs, list): raise TypeError("The ftrs argument must be a list.") if len(ftrs) > 10: raise ValueError( "The ftrs list must be no longer than 10 elements in length." ) for ftr in ftrs: if not isinstance(ftr, str): raise TypeError("The elements of the ftrs list must be strings.") if ftr not in ftrcoef_df.columns: raise ValueError( f"""The feature {ftr} is not in the list of feature coefficients for the pipeline {name}. """ ) if not isinstance(title, str) and title is not None: raise TypeError("The title must be a string.") if ftrs_renamed is not None: if not isinstance(ftrs_renamed, dict): raise TypeError("The ftrs_renamed argument must be a dictionary.") for key, value in ftrs_renamed.items(): if not isinstance(key, str): raise TypeError( "The keys of the ftrs_renamed dictionary must be strings." ) if not isinstance(value, str): raise TypeError( "The values of the ftrs_renamed dictionary must be strings." ) if key not in self.X.columns: raise ValueError( f"""The key {key} in the ftrs_renamed dictionary is not a feature in the pipeline {name}. """ ) if not isinstance(figsize, tuple): raise TypeError("The figsize argument must be a tuple.") if len(figsize) != 2: raise ValueError("The figsize argument must be a tuple of length 2.") for element in figsize: if not isinstance(element, numbers.Real): raise TypeError( "The elements of the figsize tuple must be floats or ints." ) if cap is not None: if not isinstance(cap, int): raise TypeError("The cap argument must be an integer.") if cap <= 0: raise ValueError("The cap argument must be greater than zero.") if cap > 10: raise ValueError("The cap argument must be no greater than 10.") if title_fontsize is not None: if not isinstance(title_fontsize, int): raise TypeError("The title_fontsize argument must be an int.") if label_fontsize is not None: if not isinstance(label_fontsize, int): raise TypeError("The label_fontsize argument must be an int.") if tick_fontsize is not None: if not isinstance(tick_fontsize, int): raise TypeError("The tick_fontsize argument must be an int.") # Set the style sns.set_style("darkgrid") # Reshape dataframe for plotting ftrcoef_df = self.get_feature_importances(name) years = ftrcoef_df["real_date"].dt.year years.name = "year" ftrcoef_df.drop(columns=["real_date", "name"], inplace=True) # Sort dataframe columns in ascending order of the number of Na values in the columns na_count = ftrcoef_df.isna().sum().sort_values() reindexed_columns = na_count.index ftrcoef_df = ftrcoef_df[reindexed_columns] if cap is not None: ftrcoef_df = ftrcoef_df.T.head(cap).T ftrcoef_df = pd.concat((ftrcoef_df, years), axis=1) # Define colour map default_cycle_colors = plt.rcParams["axes.prop_cycle"].by_key()["color"][:10] cmap = mcolors.LinearSegmentedColormap.from_list( "default_cycle", default_cycle_colors ) # Handle case where there are more than 10 features if ftrs is not None: ftrcoef_df = ftrcoef_df[ftrs + ["year"]] else: if ftrcoef_df.shape[1] > 11: ftrcoef_df = pd.concat( (ftrcoef_df.iloc[:, :10], ftrcoef_df.iloc[:, -1]), axis=1 ) # Average the coefficients for each year and separate into positive and negative values if ftrs_renamed is not None: ftrcoef_df.rename(columns=ftrs_renamed, inplace=True) avg_coefs = ftrcoef_df.groupby("year", observed=True).mean() pos_coefs = avg_coefs.clip(lower=0) neg_coefs = avg_coefs.clip(upper=0) ax = None # Create stacked bar plot if pos_coefs.sum().any(): ax = pos_coefs.plot( kind="bar", stacked=True, figsize=figsize, colormap=cmap, alpha=0.75 ) if neg_coefs.sum().any(): neg_coefs.plot( kind="bar", stacked=True, figsize=figsize, colormap=cmap, alpha=0.75, ax=ax, ) # Display, title, axis labels if title is None: plt.title( f"Stacked bar plot of model coefficients: {name}", fontsize=title_fontsize, ) else: plt.title(title, fontsize=title_fontsize) plt.xlabel("Year", fontsize=label_fontsize) plt.ylabel("Average Coefficient Value", fontsize=label_fontsize) plt.axhline(0, color="black", linewidth=0.8) # Adds a line at zero # Configure legend handles, labels = plt.gca().get_legend_handles_labels() by_label = dict(zip(labels, handles)) plt.legend( by_label.values(), by_label.keys(), title="Coefficients", bbox_to_anchor=(1.05, 1), loc="upper left", fontsize=tick_fontsize, title_fontsize=label_fontsize, ) plt.xticks(fontsize=tick_fontsize) plt.yticks(fontsize=tick_fontsize) # Display plot plt.tight_layout() plt.show()
[docs] def nsplits_timeplot( self, name, title=None, figsize=(10, 6), title_fontsize=None, label_fontsize=None, tick_fontsize=None, ): """ Method to plot the time series for the number of cross-validation splits used by the signal optimizer. Parameters ---------- name : str Name of the previously run signal optimization process. title : str, optional Title of the plot. Default is None. This creates a figure title of the form "Stacked bar plot of model coefficients: {name}". figsize : tuple of floats or ints, optional Tuple of floats or ints denoting the figure size. Default is (10, 6). title_fontsize : int, optional Font size for the title. Default is None. label_fontsize : int, optional Font size for the x and y axis labels. Default is None. tick_fontsize : int, optional Font size for the x and y axis ticks. Default is None. """ # Checks if not isinstance(name, str): raise TypeError("The pipeline name must be a string.") if name not in self.chosen_models.name.unique(): raise ValueError( f"""The pipeline name {name} is not in the list of already-calculated pipelines. Please check the pipeline name carefully. If correct, please run calculate_predictions() first. """ ) models_df = self.get_optimal_models(name) if not isinstance(title, str) and title is not None: raise TypeError("The title must be a string.") if not isinstance(figsize, tuple): raise TypeError("The figsize argument must be a tuple.") if len(figsize) != 2: raise ValueError("The figsize argument must be a tuple of length 2.") for element in figsize: if not isinstance(element, (int, float)): raise TypeError( "The elements of the figsize tuple must be floats or ints." ) if title_fontsize is not None and not isinstance(title_fontsize, int): raise TypeError("The title_fontsize argument must be an int.") if label_fontsize is not None and not isinstance(label_fontsize, int): raise TypeError("The label_fontsize argument must be an int.") if tick_fontsize is not None and not isinstance(tick_fontsize, int): raise TypeError("The tick_fontsize argument must be an int.") # Set the style sns.set_style("darkgrid") # Reshape dataframe for plotting models_df = models_df.set_index("real_date").sort_index() models_df = models_df.loc[:, "n_splits_used"] models_df_expanded = pd.DataFrame(models_df.tolist(), index=models_df.index) # Create time series plot # TODO: extend the number of splits line until the first date that the number of splits is incremented # This translates into vertical lines at each increment date as opposed to linear interpolation between them. fig, ax = plt.subplots() models_df_expanded.plot(ax=ax, figsize=figsize) if title is not None: plt.title(title, fontsize=title_fontsize) else: plt.title( f"Number of CV splits for pipeline: {name}", fontsize=title_fontsize ) ax.set_xlabel(ax.get_xlabel(), fontsize=label_fontsize) ax.set_ylabel(ax.get_ylabel(), fontsize=label_fontsize) # Customize tick label font sizes ax.tick_params(axis="x", labelsize=tick_fontsize) ax.tick_params(axis="y", labelsize=tick_fontsize) plt.show()
if __name__ == "__main__": from sklearn.linear_model import Ridge, Lasso from sklearn.metrics import make_scorer, r2_score, mean_absolute_error from macrosynergy.learning import ( ExpandingKFoldPanelSplit, TimeWeightedLinearRegression, ) from macrosynergy.management.simulate import make_qdf from macrosynergy.management.types import QuantamentalDataFrame cids = ["AUD", "CAD", "GBP", "USD"] xcats = ["XR", "CRY", "GROWTH", "INFL"] cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"] df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2012-01-01", "2020-12-31", 0, 1] df_cids.loc["CAD"] = ["2012-01-01", "2020-12-31", 0, 1] df_cids.loc["GBP"] = ["2012-01-01", "2020-12-31", 0, 1] df_cids.loc["USD"] = ["2012-01-01", "2020-12-31", 0, 1] df_xcats = pd.DataFrame(index=xcats, columns=cols) df_xcats.loc["XR"] = ["2012-01-01", "2020-12-31", 0.1, 1, 0, 0.3] df_xcats.loc["CRY"] = ["2012-01-01", "2020-12-31", 1, 2, 0.95, 1] df_xcats.loc["GROWTH"] = ["2012-01-01", "2020-12-31", 1, 2, 0.9, 1] df_xcats.loc["INFL"] = ["2012-01-01", "2020-12-31", -0.1, 2, 0.8, 0.3] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) dfd["grading"] = np.ones(dfd.shape[0]) black = { "GBP": ( pd.Timestamp(year=2009, month=1, day=1), pd.Timestamp(year=2012, month=6, day=30), ), "CAD": ( pd.Timestamp(year=2015, month=1, day=1), pd.Timestamp(year=2016, month=1, day=1), ), } so = SignalOptimizer( df=dfd, xcats=["CRY", "GROWTH", "INFL", "XR"], cids=cids, blacklist=black, ) so.calculate_predictions( name="LR", models={ "Ridge": Ridge(), "Lasso": Lasso(), "TWLS": TimeWeightedLinearRegression(), }, hyperparameters={ "Ridge": { "fit_intercept": [True, False], "alpha": [1e-4, 1e-3, 1e-2, 1e-1, 1, 10, 100, 1000, 10000], }, "Lasso": { "fit_intercept": [True, False], "alpha": [1e-4, 1e-3, 1e-2, 1e-1, 1, 10, 100, 1000, 10000], }, "TWLS": { "half_life": [24, 36, 60, 120, 240], "fit_intercept": [True, False], }, }, scorers={ "r2": make_scorer(r2_score), "mae": make_scorer(mean_absolute_error, greater_is_better=False), }, inner_splitters={ "ExpandingKFold": ExpandingKFoldPanelSplit(n_splits=5), "SecondSplit": ExpandingKFoldPanelSplit(n_splits=10), }, #search_type="prior", #n_iter=6, cv_summary="mean-std-ge", include_train_folds=True, n_jobs_outer=1, n_jobs_inner=1, normalize_fold_results=True, # split_functions={ # "ExpandingKFold": lambda n: n // 12, # "SecondSplit": None, # }, ) so.models_heatmap("LR") so.feature_importance_timeplot("LR") so.coefs_stackedbarplot("LR") so.nsplits_timeplot("LR") so.feature_selection_heatmap("LR", tick_fontsize=20) # Test a random forest from sklearn.ensemble import RandomForestRegressor so.calculate_predictions( name="RF", models={ "RF": RandomForestRegressor( n_estimators=100, min_samples_leaf=5, max_features="sqrt", max_samples=0.1, ), }, hyperparameters={ "RF": {}, }, scorers={ "r2": make_scorer(r2_score), "mae": make_scorer(mean_absolute_error, greater_is_better=False), }, inner_splitters={ "ExpandingKFold": ExpandingKFoldPanelSplit(n_splits=2), "SecondSplit": ExpandingKFoldPanelSplit(n_splits=3), }, search_type="grid", cv_summary="mean-std", n_jobs_outer=-1, n_jobs_inner=1, ) so.feature_importance_timeplot("RF")