Source code for macrosynergy.learning.sequential.beta_estimator

"""
Class to estimate market betas and calculate out-of-sample hedged returns based on
sequential learning. 
"""

from collections import defaultdict

import numpy as np
import pandas as pd
from sklearn.ensemble import VotingRegressor

from macrosynergy.learning.forecasting.model_systems.base_regression_system import (
    BaseRegressionSystem,
)
from macrosynergy.management.types import QuantamentalDataFrame
from macrosynergy.learning import ExpandingFrequencyPanelSplit
from macrosynergy.learning.sequential import BasePanelLearner
from macrosynergy.management import categories_df, reduce_df, update_df
from macrosynergy.management.utils.df_utils import (
    concat_categorical,
    _insert_as_categorical,
)


[docs]class BetaEstimator(BasePanelLearner): """ Class for sequential beta estimation by learning optimal regression coefficients. Out-of-sample hedged returns are additionally calculated and stored. Parameters ---------- df : pd.DataFrame Daily quantamental dataframe in JPMaQS format containing a panel of features, as well as a panel of returns. xcats : str or list Name of a market return category within the panel specified in `df`. benchmark_return : str Name of the benchmark return ticker within the panel specified in `df`. cids : list, optional List of cross-sections for which hedged returns are to be calculated. Default is None, which calculates hedged returns for all cross-sections in the return panel. start : str, optional Start date for considered data in subsequent analysis in ISO 8601 format. Default is None i.e. the earliest date in the dataframe. end : str, optional End date for considered data in subsequent analysis in ISO 8601 format. Default is None i.e. the latest date in the dataframe. Notes ----- The `BetaEstimator` class is used to sequentially estimate macro betas based on a panel of contract returns (provided in `xcats`) and a benchmark return ticker (provided in `benchmark_return`). The initial conditions of the learning process are specified by the dimensions of an initial training set. An optimal model is selected out of a provided collection (with associated hyperparameters), a beta is extracted for each cross-section (subject to availability) and out-of-sample hedged returns are calculated for each cross-section with an estimated beta. The betas and hedged returns are stored, and the training set is expanded to include the now-realized returns. This process is repeated until the end of the dataset is reached. In addition to storing betas and hedged returns, this class also stores useful model selection information for analysis, such as the models selected at each point in time. Model and hyperparameter selection is performed by cross-validation. Given a collection of models and associated hyperparameters to choose from, an HPO is run - currently only grid search and random search are supported - to determine the optimal choice. This is done by providing a collection of `scikit-learn` compatible scoring functions, as well as a collection of `scikit-learn` compatible cross-validation splitters and scorers. At each point in time, the cross-validation folds are the union of the folds produced by each splitter provided. Each scorer is evaluated on each test fold and summarised across test folds by either a custom function provided by the user or a common string i.e. 'mean'. Consequently, each model and hyperparameter combination has an associated collection of scores induced by different metrics, in units of those scorers. In order to form a composite score for each hyperparameter, the scores must be normalized across model/hyperparameter combinations. This makes scores across scorers comparable, so that the average score across adjusted scores can be used as a meaningful estimate of each model's generalization ability. Finally, a composite score for each model and hyperparameter combination is calculated by averaging the adjusted scores across all scorers. The optimal model is the one with the largest composite score. """ def __init__( self, df, xcats, benchmark_return, cids=None, start=None, end=None, ): # Checks # TODO: Refactor these checks. if not isinstance(df, pd.DataFrame): raise TypeError("df must be a pandas DataFrame.") if not set(["cid", "xcat", "real_date", "value"]).issubset(df.columns): raise ValueError( "df must have columns 'cid', 'xcat', 'real_date' and 'value'." ) # cids checks if cids is not None: if not isinstance(cids, list): raise TypeError("cids must be a list.") if not all(isinstance(cid, str) for cid in cids): raise TypeError("All elements in cids must be strings.") for cid in cids: if cid not in df["cid"].unique(): raise ValueError(f"{cid} not in the dataframe.") if not isinstance(benchmark_return, str): raise TypeError("benchmark_return must be a string.") if isinstance(xcats, str): xcats = [xcats] elif isinstance(xcats, list): if not all(isinstance(xcat, str) for xcat in xcats): raise TypeError("All elements in xcats must be strings.") elif len(xcats) != 1: raise ValueError("xcats must be a string or a list of a single xcat.") else: raise TypeError("xcats must be a string or a list of a single xcat.") # Create pseudo-panel dfx = pd.DataFrame(columns=["real_date", "cid", "xcat", "value"]) self.benchmark_return = benchmark_return self.benchmark_cid, self.benchmark_xcat = benchmark_return.split("_", 1) for cid in cids: # Extract cross-section contract returns dfa = reduce_df( df=df, xcats=xcats if isinstance(xcats, list) else [xcats], cids=[cid], ) # Extract benchmark returns dfb = reduce_df( df=df, xcats=[self.benchmark_xcat], cids=[self.benchmark_cid], ) # Combine contract and benchmark returns and rename cross-section identifier # in order to match the benchmark return with each cross section in a pseudo # panel df_cid = pd.concat([dfa, dfb], axis=0) df_cid["cid"] = f"{cid}v{self.benchmark_cid}" dfx = update_df(dfx, df_cid) super().__init__( df=dfx, xcats=( [self.benchmark_xcat] + xcats if isinstance(xcats, list) else [self.benchmark_xcat, xcats] ), cids=list(dfx["cid"].unique()), start=start, end=end, blacklist=None, freq="D", lag=0, ) # Create forecast dataframe index min_date = min(self.unique_date_levels) max_date = max(self.unique_date_levels) forecast_date_levels = pd.date_range(start=min_date, end=max_date, freq="B") self.forecast_idxs = pd.MultiIndex.from_product( [ [cid.split("v")[0] for cid in self.unique_xs_levels], forecast_date_levels, ], names=["cid", "real_date"], ) # Create initial dataframes to store estimated betas and OOS hedged returns self.betas = pd.DataFrame(columns=["real_date", "cid", "xcat", "value"]).astype( { "real_date": "datetime64[s]", "cid": "category", "xcat": "category", "value": "float32", } ) self.hedged_returns = pd.DataFrame( columns=["real_date", "cid", "xcat", "value"] ).astype( { "real_date": "datetime64[s]", "cid": "category", "xcat": "category", "value": "float32", } )
[docs] def estimate_beta( self, beta_xcat, hedged_return_xcat, models, hyperparameters, scorers, inner_splitters, search_type="grid", normalize_fold_results=False, cv_summary="mean", include_train_folds=False, min_cids=4, min_periods=12 * 3, est_freq="D", max_periods=None, split_functions=None, n_iter=None, n_jobs_outer=-1, n_jobs_inner=1, ): """ Determines optimal model betas and associated out-of-sample hedged returns. Parameters ---------- beta_xcat : str Category name for the panel of estimated betas. hedged_return_xcat : str Category name for the panel of out-of-sample hedged returns. models : dict Dictionary of models to choose from. The keys are model names and the values are scikit-learn compatible models. hyperparameters : dict Dictionary of hyperparameters to choose from. The keys are model names and the values are hyperparameter dictionaries for the corresponding model. The keys must match with those provided in `models`. scorers : dict Dictionary of scoring functions to use in the hyperparameter optimization process. The keys are scorer names and the values are scikit-learn compatible scoring functions. inner_splitters : dict Dictionary of inner splitters to use in the hyperparameter optimization process. The keys are splitter names and the values are scikit-learn compatible cross-validator objects. search_type : str Type of hyperparameter optimization to perform. Default is "grid". Options are "grid" and "prior". normalize_fold_results : bool Whether to normalize the scores across folds before combining them. Default is False. cv_summary : str or callable, optional Summary function to use to combine scores across cross-validation folds. Default is "mean". Options are "mean", "median", "mean-std", "mean/std", "mean-std-ge" or a callable function. include_train_folds : bool, optional Whether to calculate cross-validation statistics on the training folds in additional to the test folds. If True, the cross-validation estimator will be a function of both training data and test data. It is recommended to set `cv_summary` appropriately. Default is False. min_cids : int Minimum number of cross-sections required for the initial training set. Default is 4. min_periods : int Minimum number of periods required for the initial training set, in units of the frequency `freq` specified in the constructor. Default is 36. est_freq : str Frequency at which models are refreshed. This corresponds with forward frequency of out-of-sample hedged returns and the frequency at which betas are estimated. max_periods : int Maximum length of each training set in units of the frequency `freq` specified in the constructor. Default is None, in which case the sequential optimization uses expanding training sets, as opposed to rolling windows. split_functions : dict, optional Dict of callables for determining the number of cross-validation splits to add to the initial number as a function of the number of iterations passed in the sequential learning process. Default is None. The keys must correspond to the keys in `inner_splitters` and should be set to None for any splitters that do not require splitter adjustment. n_iter : int, optional Number of iterations to run in random hyperparameter search. Default is None. n_jobs_outer : int, optional Number of jobs to run in parallel for the outer sequential loop. Default is -1. It is advised for n_jobs_inner * n_jobs_outer (replacing -1 with the number of available cores) to be less than or equal to the number of available cores on the machine. n_jobs_inner : int, optional Number of jobs to run in parallel for the inner loop. Default is 1. It is advised for n_jobs_inner * n_jobs_outer (replacing -1 with the number of available cores) to be less than or equal to the number of available cores on the machine. """ # Checks # All others are checked in the run method if not isinstance(hedged_return_xcat, str): raise TypeError("hedged_return_xcat must be a string.") self.hedged_return_xcat = hedged_return_xcat # Set up outer splitter outer_splitter = ExpandingFrequencyPanelSplit( expansion_freq=est_freq, test_freq=est_freq, min_cids=min_cids, min_periods=min_periods, ) # Run pipeline results = self.run( name=beta_xcat, outer_splitter=outer_splitter, inner_splitters=inner_splitters, models=models, hyperparameters=hyperparameters, scorers=scorers, search_type=search_type, normalize_fold_results=normalize_fold_results, cv_summary=cv_summary, include_train_folds=include_train_folds, split_functions=split_functions, n_iter=n_iter, n_jobs_outer=n_jobs_outer, n_jobs_inner=n_jobs_inner, ) if hedged_return_xcat in self.hedged_returns["xcat"].unique(): self.hedged_returns = self.hedged_returns[ self.hedged_returns.xcat != hedged_return_xcat ] if beta_xcat in self.betas["xcat"].unique(): self.betas = self.betas[self.betas.xcat != beta_xcat] if beta_xcat in self.chosen_models.name.unique(): self.chosen_models = self.chosen_models[ self.chosen_models.name != beta_xcat ] # Collect results from the worker beta_data = [] hedged_return_data = [] model_choice_data = [] for split_result in results: beta_data.extend(split_result["betas"]) hedged_return_data.extend(split_result["hedged_returns"]) model_choice_data.append(split_result["model_choice"]) stored_betas = pd.DataFrame( index=self.forecast_idxs, columns=[beta_xcat], data=np.nan, dtype="float32" ) # Create quantamental dataframes of betas and hedged returns for real_date, cid, value in beta_data: stored_betas.loc[(cid, real_date), beta_xcat] = value stored_betas = stored_betas.groupby(level=0, observed=True).ffill().dropna() stored_betas.columns = stored_betas.columns.astype("category") stored_betas_long = pd.melt( frame=stored_betas.reset_index(), id_vars=["real_date", "cid"], var_name="xcat", value_name="value", ) hedged_returns = ( pd.DataFrame(hedged_return_data, columns=["real_date", "cid", "value"]) .sort_values(["cid", "real_date"]) .dropna() ).astype({"cid": "category"}) hedged_returns = _insert_as_categorical( hedged_returns, "xcat", hedged_return_xcat, 2 ) self.betas = concat_categorical(self.betas, stored_betas_long) self.hedged_returns = concat_categorical( self.hedged_returns, hedged_returns, ) # Store model selection data model_df_long = pd.DataFrame( columns=[col for col in self.chosen_models.columns if col != "name"], data=model_choice_data, ).astype({"model_type": "category"}) model_df_long = _insert_as_categorical(model_df_long, "name", beta_xcat, 1) self.chosen_models = concat_categorical(self.chosen_models, model_df_long)
[docs] def store_split_data( self, pipeline_name, optimal_model, optimal_model_name, optimal_model_score, optimal_model_params, inner_splitters_adj, X_train, y_train, X_test, y_test, timestamp, adjusted_test_index, ): """ Stores characteristics of the optimal model at each retraining date. Parameters ---------- pipeline_name : str Name of the signal optimization process. optimal_model : BaseRegressionSystem or VotingRegressor Optimal model selected at each retraining date. optimal_model_name : str Name of the optimal model. optimal_model_score : float Cross-validation score for the optimal model. optimal_model_params : dict Chosen hyperparameters for the optimal model. inner_splitters_adj : dict Dictionary of adjusted inner splitters. X_train : pd.DataFrame Training feature matrix. y_train : pd.Series Training response variable. X_test : pd.DataFrame Test feature matrix. y_test : pd.Series Test response variable. timestamp : pd.Timestamp Timestamp of the retraining date. adjusted_test_index : pd.MultiIndex Adjusted test index to account for lagged features. Returns ------- dict Dictionary containing the betas and hedged returns determined at the given retraining date. """ if isinstance(optimal_model, VotingRegressor): estimators = optimal_model.estimators_ coefs_list = [est.coefs_ for est in estimators] sum_dict = defaultdict(lambda: [0, 0]) for coefs in coefs_list: for key, value in coefs.items(): sum_dict[key][0] += value sum_dict[key][1] += 1 betas = {key: sum / count for key, (sum, count) in sum_dict.items()} elif isinstance(optimal_model, BaseRegressionSystem): betas = optimal_model.coefs_ else: X_train.index.get_level_values(0).unique() betas = {cid: np.nan for cid in X_train.index.get_level_values(0).unique()} betas_list = [ [ X_train.index.get_level_values(1).max(), cid.split("v")[0], beta, ] for cid, beta in betas.items() ] # Now calculate the induced hedged returns betas_series = pd.Series(betas) XB = X_test.mul(betas_series, level=0, axis=0) hedged_returns = y_test.values.reshape(-1, 1) - XB.values.reshape(-1, 1) hedged_returns_data = [ [idx[1], idx[0].split("v")[0]] + [hedged_returns[i].item()] for i, (idx, _) in enumerate(y_test.items()) ] return {"betas": betas_list, "hedged_returns": hedged_returns_data}
[docs] def evaluate_hedged_returns( self, hedged_return_xcat=None, cids=None, correlation_types="pearson", title=None, start=None, end=None, blacklist=None, freqs="M", ): """ Method to determine and display a table of average absolute correlations between the benchmark return and the computed hedged returns within the class instance, over all cross-sections in the panel. Additionally, the correlation table displays the same results for the unhedged return specified in the class instance for comparison purposes. The returned dataframe will be multi-indexed by (benchmark return, return, frequency) and will contain each computed absolute correlation coefficient on each column. Parameters ---------- hedged_return_xcat : str or list, optional Hedged returns to be evaluated. Default is None, which evaluates all hedged returns within the class instance. cids : str or list, optional Cross-sections for which evaluation of hedged returns takes place. Default is None, which evaluates all cross-sections within the class instance. correlation_types : str or list, optional Types of correlations to calculate. Options are "pearson", "spearman" and "kendall". If None, all three are calculated. Default is "pearson". title : str, optional Title for the correlation table. If None, the default title is "Average absolute correlations between each return and the chosen benchmark". Default is None. start : str, optional String in ISO format. Default is None. end : str, optional String in ISO format. Default is None. blacklist : dict, optional Dictionary of tuples of start and end dates to exclude from the evaluation. Default is None. freqs: str or list, optional Letters denoting all frequencies at which the correlations may be calculated. This must be a selection of "D", "W", "M", "Q" and "A". Default is "M". Each return series will always be summed over the sample period. Returns ------- pd.DataFrame A dataframe of average absolute correlations between the benchmark return and the computed hedged returns. """ # Checks correlation_types, hedged_return_xcat, cids, freqs = ( self._checks_evaluate_hedged_returns( correlation_types=correlation_types, hedged_return_xcat=hedged_return_xcat, cids=cids, start=start, end=end, blacklist=blacklist, freqs=freqs, ) ) cids_v_benchmark = [f"{cid}v{self.benchmark_cid}" for cid in cids] # Construct a quantamental dataframe comprising specified hedged returns as well # as the unhedged returns and the benchmark return specified in the class instance hedged_df = self.hedged_returns[ (self.hedged_returns["xcat"].isin(hedged_return_xcat)) & (self.hedged_returns["cid"].isin(cids)) ] unhedged_df = self.df[ (self.df["xcat"].isin(self.xcats)) & (self.df["cid"].isin(cids_v_benchmark)) ] benchmark_df = self.df[ (self.df["xcat"] == self.benchmark_xcat) & (self.df["cid"] == f"{self.benchmark_cid}v{self.benchmark_cid}") ] cid_mapping = dict(zip(cids, cids_v_benchmark)) hedged_df["cid"] = hedged_df["cid"].replace(cid_mapping) combined_df = concat_categorical(hedged_df, unhedged_df) # Create a pseudo-panel to match contract return cross-sections with a replicated # benchmark return. This is multi-indexed by (new cid, real_date). The columns # are the named hedged returns, with the final column being the benchmark category. dfx = pd.DataFrame(columns=["real_date", "cid", "xcat", "value"]) for cid in cids_v_benchmark: # Extract unhedged and hedged returns dfa = reduce_df( df=combined_df, xcats=hedged_return_xcat + self.xcats, cids=[cid], ) # Extract benchmark returns dfb = reduce_df( df=benchmark_df, xcats=[self.benchmark_xcat], cids=[self.benchmark_cid], ) # Combine and rename cross-section df_cid = concat_categorical(dfa, dfb) dfx = update_df(dfx, df_cid) # Create long format dataframes for each specified frequency Xy_long_freq = [] for freq in freqs: Xy_long = categories_df( df=dfx, xcats=hedged_return_xcat + self.xcats, cids=[cid for cid in cids_v_benchmark], start=start, end=end, blacklist=blacklist, freq=freq, xcat_aggs=["sum", "sum"], ) Xy_long_freq.append(Xy_long) # For each xcat and frequency, calculate the mean absolute correlations # between the benchmark return and the (hedged and unhedged) market returns df_rows = [] xcats_non_benchmark = [ xcat for xcat in self.xcats if xcat != self.benchmark_xcat ] for xcat in hedged_return_xcat + xcats_non_benchmark: for freq, Xy_long in zip(freqs, Xy_long_freq): calculated_correlations = [] for correlation in correlation_types: calculated_correlations.append( self._get_mean_abs_corrs( xcat=xcat, df=Xy_long, correlation=correlation, cids=cids, ) ) df_rows.append(calculated_correlations) # Create underlying dataframe to store the results multiindex = pd.MultiIndex.from_product( [[self.benchmark_return], hedged_return_xcat + xcats_non_benchmark, freqs], names=["benchmark return", "return category", "frequency"], ) corr_df = pd.DataFrame( columns=[correlation for correlation in correlation_types], index=multiindex, data=df_rows, ) return corr_df
def _checks_evaluate_hedged_returns( self, correlation_types, hedged_return_xcat, cids, start, end, blacklist, freqs, ): """ Input checks for the `evaluate_hedged_returns()` method. Parameters ---------- correlation_types : str or list Types of correlations to calculate. hedged_return_xcat : str or list, optional Hedged returns to be evaluated. cids : str or list, optional Cross-sections for which evaluation of hedged returns takes place. start : str, optional Start date for evaluation. end : str, optional End date for evaluation. blacklist : dict, optional Dictionary of tuples of start and end dates to exclude from the evaluation. freqs: str or list, optional Letters denoting all frequencies at which the correlations may be calculated. """ if isinstance(correlation_types, str): correlation_types = [correlation_types] elif not isinstance(correlation_types, list): raise TypeError("correlation_types must be a string or a list") if not all( isinstance(correlation_type, str) for correlation_type in correlation_types ): raise TypeError("All elements in correlation_types must be strings.") if not all( correlation_type in ["pearson", "spearman", "kendall"] for correlation_type in correlation_types ): raise ValueError( "All elements in correlation_types must be one of 'pearson', 'spearman' or 'kendall'." ) if hedged_return_xcat is None: hedged_return_xcat = list(self.hedged_returns["xcat"].unique()) else: if isinstance(hedged_return_xcat, str): hedged_return_xcat = [hedged_return_xcat] elif not isinstance(hedged_return_xcat, list): raise TypeError("hedged_return_xcat must be a string or a list") if not all(isinstance(xcat, str) for xcat in hedged_return_xcat): raise TypeError( "All elements in hedged_return_xcat, when a list, must be strings." ) if not ( set(hedged_return_xcat).issubset(self.hedged_returns["xcat"].unique()) ): raise ValueError( "hedged_return_xcat must be a valid hedged return category within the class instance." ) if cids is None: cids = self.hedged_returns["cid"].unique().tolist() else: if isinstance(cids, str): cids = [cids] elif not isinstance(cids, list): raise TypeError("cids must be a string or a list") if not all(isinstance(cid, str) for cid in cids): raise TypeError("All elements in cids must be strings.") if not all(cid in self.hedged_returns["cid"].unique() for cid in cids): raise ValueError( "All cids must be valid cross-section identifiers within the class instance." ) if start is not None and not isinstance(start, str): raise TypeError("start must be a string.") if end is not None and not isinstance(end, str): raise TypeError("end must be a string.") if blacklist is not None: if not isinstance(blacklist, dict): raise TypeError("The blacklist argument must be a dictionary.") if len(blacklist) == 0: raise ValueError("The blacklist argument must not be empty.") if not all([isinstance(key, str) for key in blacklist.keys()]): raise TypeError("The keys of the blacklist argument must be strings.") if not all( [isinstance(value, (list, tuple)) for value in blacklist.values()] ): raise TypeError("The values of the blacklist argument must be tuples.") if not all([len(value) == 2 for value in blacklist.values()]): raise ValueError( "The values of the blacklist argument must be tuples of length two." ) if not all( [ isinstance(date, pd.Timestamp) for value in blacklist.values() for date in value ] ): raise TypeError( "The values of the blacklist argument must be tuples of pandas Timestamps." ) # freqs checks if isinstance(freqs, str): freqs = [freqs] elif not isinstance(freqs, list): raise TypeError("freqs must be a string or a list of strings") if not all(isinstance(freq, str) for freq in freqs): raise TypeError("All elements in freqs must be strings.") if not all(freq in ["D", "W", "M", "Q"] for freq in freqs): raise ValueError( "All elements in freqs must be one of 'D', 'W', 'M' or 'Q'." ) return correlation_types, hedged_return_xcat, cids, freqs
[docs] def get_hedged_returns( self, hedged_return_xcat = None, ): """ Returns a dataframe of out-of-sample hedged returns derived from beta estimation processes held within the class instance. Parameters ---------- hedged_return_xcat : str or list, optional Category name or list of category names for the panel of derived hedged returns. If None, information from all beta estimation processes held within the class instance is returned. Default is None. Returns ------- pd.DataFrame A dataframe of out-of-sample hedged returns derived from beta estimation processes. """ # Checks hedged_return_xcat = self._checks_get_hedged_returns( hedged_return_xcat=hedged_return_xcat ) if hedged_return_xcat is None: hedged_returns = self.hedged_returns else: hedged_returns = self.hedged_returns[ self.hedged_returns.xcat.isin(hedged_return_xcat) ] return QuantamentalDataFrame( hedged_returns, _initialized_as_categorical=self.df.InitializedAsCategorical ).to_original_dtypes()
def _checks_get_hedged_returns( self, hedged_return_xcat, ): """ Input checks for the `get_hedged_returns()` method. Parameters ---------- hedged_return_xcat : str or list Category name or list of category names for the panel of derived hedged returns. Returns ------- str or list Category name or list of category names for the panel of derived hedged returns. """ if hedged_return_xcat is not None: if isinstance(hedged_return_xcat, str): hedged_return_xcat = [hedged_return_xcat] elif not isinstance(hedged_return_xcat, list): raise TypeError("hedged_return_xcat must be a string or a list") if not all(isinstance(xcat, str) for xcat in hedged_return_xcat): raise TypeError( "All elements in hedged_return_xcat, when a list, must be strings." ) if not ( set(hedged_return_xcat).issubset(self.hedged_returns["xcat"].unique()) ): raise ValueError( "hedged_return_xcat must be a valid hedged return category within the class instance." ) return hedged_return_xcat
[docs] def get_betas( self, beta_xcat = None, ): """ Returns a dataframe of estimated betas derived from beta estimation processes held within the class instance. Parameters ---------- beta_xcat : str or list Category name or list of category names for the panel of estimated contract betas. If None, information from all beta estimation processes held within the class instance is returned. Default is None. Returns ------- pd.DataFrame A dataframe of estimated betas derived from beta estimation processes. """ # Checks beta_xcat = self._checks_get_betas(beta_xcat=beta_xcat) if beta_xcat is None: betas = self.betas else: betas = self.betas[self.betas.xcat.isin(beta_xcat)] return QuantamentalDataFrame( betas, _initialized_as_categorical=self.df.InitializedAsCategorical ).to_original_dtypes()
def _checks_get_betas( self, beta_xcat, ): """ Input checks for the `get_betas()` method. Parameters ---------- beta_xcat : str or list Category name or list of category names for the panel of estimated contract betas. Returns ------- str or list Category name or list of category names for the panel of estimated contract betas. """ if beta_xcat is not None: if isinstance(beta_xcat, str): beta_xcat = [beta_xcat] if not isinstance(beta_xcat, list): raise TypeError("beta_xcat must be a string or a list") if not all(isinstance(xcat, str) for xcat in beta_xcat): raise TypeError( "All elements in beta_xcat, when a list, must be strings." ) if not (set(beta_xcat).issubset(self.betas["xcat"].unique())): raise ValueError( "beta_xcat must be a valid beta category within the class instance." ) return beta_xcat def _get_mean_abs_corrs( self, xcat, cids, df, correlation, ): """ Calculate mean absolute correlation between a column 'xcat' in a dataframe 'df' and the benchmark return (the last column) across all cross-sections in 'cids'. The correlation is calculated using the method specified in 'correlation'. Parameters ---------- xcat : str Category name for the column in the dataframe. cids : str Cross-sections for which the correlation is calculated. df : pd.DataFrame Dataframe containing the relevant columns. correlation : str Type of correlation to calculate. Returns ------- float Mean absolute correlation between the column 'xcat' and the benchmark return. """ # Get relevant columns df_subset = df[[xcat, self.benchmark_xcat]].dropna() # Create inner function to calculate the correlation for a given cross-section # This is done so that one can groupby cross-section and apply this function directly def calculate_correlation(group): return abs(group[xcat].corr(group[self.benchmark_xcat], method=correlation)) # Calculate the mean absolute correlation over all cross sections mean_abs_corr = ( df_subset.groupby("cid", observed=True).apply(calculate_correlation).mean() ) return mean_abs_corr def _check_duplicate_results(self, hedged_return_xcat, beta_xcat): """ Check for duplicate results in the class instance and remove them. Parameters ---------- hedged_return_xcat : str Category name for the panel of out-of-sample hedged returns. beta_xcat : str Category name for the panel of estimated betas. """ conditions = [ ("hedged_returns", "xcat", hedged_return_xcat), ("betas", "xcat", beta_xcat), ("chosen_models", "name", beta_xcat), ] self._remove_results(conditions)
if __name__ == "__main__": from macrosynergy.learning import ( ExpandingKFoldPanelSplit, LinearRegressionSystem, neg_mean_abs_corr, ) from macrosynergy.management.simulate import make_qdf # Simulate a panel dataset of benchmark and contract returns cids = ["AUD", "CAD", "GBP", "USD"] xcats = ["BENCH_XR", "CONTRACT_XR"] cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"] df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2015-01-01", "2020-12-31", 0, 1] df_cids.loc["CAD"] = ["2015-01-01", "2020-12-31", 0, 1] df_cids.loc["GBP"] = ["2015-01-01", "2020-12-31", 0, 1] df_cids.loc["USD"] = ["2015-01-01", "2020-12-31", 0, 1] df_xcats = pd.DataFrame(index=xcats, columns=cols) df_xcats.loc["BENCH_XR"] = ["2015-01-01", "2019-12-31", 0.1, 1, 0, 0.3] df_xcats.loc["CONTRACT_XR"] = ["2015-01-01", "2019-12-31", 0.1, 1, 0, 0.3] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) # Initialize the BetaEstimator object # Use for the benchmark return: USD_BENCH_XR. be = BetaEstimator( df=dfd, xcats="CONTRACT_XR", benchmark_return="USD_BENCH_XR", cids=["AUD", "USD"], ) models = { "LR": LinearRegressionSystem(min_xs_samples=21 * 1), } hparam_grid = {"LR": {"fit_intercept": [True, False], "positive": [True, False]}} scorer = {"scorer": neg_mean_abs_corr} be.estimate_beta( beta_xcat="BETA_NSA", hedged_return_xcat="HEDGED_RETURN_NSA", models=models, hyperparameters=hparam_grid, scorers=scorer, inner_splitters={"expandingkfold": ExpandingKFoldPanelSplit(n_splits=5)}, search_type="grid", cv_summary="median", min_cids=1, min_periods=21 * 12, est_freq="Q", n_jobs_outer=1, n_jobs_inner=1, ) evaluation_df = be.evaluate_hedged_returns( correlation_types=["pearson", "spearman", "kendall"], freqs=["W", "M", "Q"], ) be.models_heatmap(name="BETA_NSA") # print(evaluation_df)