Source code for macrosynergy.learning.forecasting.ensemble.voting

import numpy as np
import pandas as pd

import sklearn.ensemble as skl
from sklearn.utils import Bunch

[docs]class VotingRegressor(skl.VotingRegressor): """ Regression model that averages the predictions of many regression models. Parameters ---------- estimators : list of (str, estimator) tuples List of (name, estimator) tuples that are used to fit the model. weights : array-like of shape (n_estimators,), default=None Sequence of weights to assign to models. If None, models are weighted equally. verbose : bool, default=False If True, the time elapsed while fitting will be printed as model trains. Notes ----- This class calculates feature importances as the average of the feature importances of the base estimators. """ def __init__(self, estimators, weights=None, verbose=False): super().__init__(estimators=estimators, weights=weights, n_jobs=1, verbose=verbose) # Attributes self.feature_importances_ = None self.estimators_ = None self.named_estimators_ = None
[docs] def fit(self, X, y, **fit_params): """ Fit the estimators. Parameters ---------- X : pd.DataFrame or np.ndarray Pandas dataframe or numpy array of input features. y : pd.Series or pd.DataFrame or np.ndarray Pandas series, dataframe or numpy array of targets associated with each sample in X. """ # Checks self._check_fit_params(X, y) # Fit regressors self.estimators_ = [] self.named_estimators_ = Bunch() importances = [] for _, estimator in self.estimators: # Fit the estimator self.estimators_.append(estimator.fit(X, y, **fit_params)) # Store feature importances if available if hasattr(estimator, "coef_") or hasattr(estimator, "feature_importances_"): # Normalize feature importances to sum to 1 imp = ( np.abs(estimator.coef_) / np.sum(np.abs(estimator.coef_)) if hasattr(estimator, "coef_") else estimator.feature_importances_ / np.sum(estimator.feature_importances_) ) importances.append(imp) if len(importances) > 0: if self.weights: self.feature_importances_ = np.average( importances, weights=self.weights, axis=0 ) else: self.feature_importances_ = np.mean(importances, axis=0) # Renormalize feature importances to sum to 1 if self.feature_importances_ is not None: self.feature_importances_ /= np.sum(self.feature_importances_) # Store named estimators self.named_estimators_ = Bunch( **{name: estimator for name, estimator in self.estimators} ) return self
def _check_fit_params(self, X, y): """ Checks for fit method parameters """ # X if not isinstance(X, (pd.DataFrame, np.ndarray)): raise TypeError( "Input feature matrix for the voting regressor must be either a pandas " "dataframe or numpy array." ) if isinstance(X, np.ndarray): if X.ndim != 2: raise ValueError( "When the input feature matrix for the voting regressor is a " "numpy array, it must have two dimensions." ) # y if not isinstance(y, (pd.Series, pd.DataFrame, np.ndarray)): raise TypeError( "Target vector for the voting regressor must be either a pandas series, " "dataframe or numpy array." ) if isinstance(y, pd.DataFrame): if y.shape[1] != 1: raise ValueError( "The dependent variable dataframe must have only one column. If used " "as part of an sklearn pipeline, ensure that previous steps return " "a pandas series or dataframe." ) if isinstance(y, np.ndarray): if y.ndim != 1: raise ValueError( "When the target vector for the voting regressor is a numpy " "array, it must have one dimension." ) if X.shape[0] != y.shape[0]: raise ValueError( "The number of samples in the input feature matrix must match the number " "of samples in the target vector." )
[docs]class VotingClassifier(skl.VotingClassifier): """ Classification model that votes on the predictions of many classifiers. Parameters ---------- estimators : list of (str, estimator) tuples List of (name, estimator) tuples that are used to fit the model. voting : {'hard', 'soft'}, default='hard' If 'hard', uses predicted class labels for majority rule voting. If 'soft', predicts the class label based on the argmax of the sums of the predicted probabilities, which is recommended for an ensemble of well-calibrated classifiers. weights : array-like of shape (n_estimators,), default=None Sequence of weights to assign to models. If None, models are weighted equally. n_jobs : int, default=None The number of jobs to run in parallel for `fit`. `None` means 1 unless in a `joblib.parallel_backend` context. `-1` means using all processors. flatten_transform : bool, default=True Affects shape of transform output only when voting='soft'. If True, the transform method returns a matrix with shape (n_samples, n_classes*n_classifiers). If False, the shape is (n_classifiers, n_samples, n_classes). verbose : bool, default=False If True, the time elapsed while fitting will be printed as model trains. Notes ----- This class calculates feature importances as the average of the feature importances of the base estimators. """ def __init__(self, estimators, voting="hard", weights=None, n_jobs=None, flatten_transform=True, verbose=False): super().__init__(estimators=estimators, voting=voting, weights=weights, n_jobs=n_jobs, flatten_transform=flatten_transform, verbose=verbose) self.feature_importances_ = None
[docs] def fit(self, X, y, sample_weight=None, **fit_params): """ Fit the estimators. Parameters ---------- X : pd.DataFrame or np.ndarray Pandas dataframe or numpy array of input features. y : pd.Series or pd.DataFrame or np.ndarray Pandas series, dataframe or numpy array of targets associated with each sample in X. """ # Checks self._check_fit_params(X, y) # Fit classifiers super().fit(X, y, sample_weight, **fit_params) # Calculate feature importances importances = [] for estimator in self.estimators_: if hasattr(estimator, "coef_") or hasattr( estimator, "feature_importances_" ): # Normalize feature importances to sum to 1 imp = ( np.squeeze(np.abs(estimator.coef_) / np.sum(np.abs(estimator.coef_))) if hasattr(estimator, "coef_") else estimator.feature_importances_ / np.sum(estimator.feature_importances_) ) importances.append(imp) if len(importances) > 0: self.feature_importances_ = np.mean(importances, axis=0) # Renormalize feature importances to sum to 1 if self.feature_importances_ is not None: self.feature_importances_ /= np.sum(self.feature_importances_) return self
def _check_fit_params(self, X, y): """ Checks for fit method parameters """ # X if not isinstance(X, (pd.DataFrame, np.ndarray)): raise TypeError( "Input feature matrix for the voting regressor must be either a pandas " "dataframe or numpy array." ) if isinstance(X, np.ndarray): if X.ndim != 2: raise ValueError( "When the input feature matrix for the voting regressor is a " "numpy array, it must have two dimensions." ) # y if not isinstance(y, (pd.Series, pd.DataFrame, np.ndarray)): raise TypeError( "Target vector for the voting regressor must be either a pandas series, " "dataframe or numpy array." ) if isinstance(y, pd.DataFrame): if y.shape[1] != 1: raise ValueError( "The dependent variable dataframe must have only one column. If used " "as part of an sklearn pipeline, ensure that previous steps return " "a pandas series or dataframe." ) if isinstance(y, np.ndarray): if y.ndim != 1: raise ValueError( "When the target vector for the voting regressor is a numpy " "array, it must have one dimension." ) if X.shape[0] != y.shape[0]: raise ValueError( "The number of samples in the input feature matrix must match the number " "of samples in the target vector." )
if __name__ == "__main__": import macrosynergy.management as msm from macrosynergy.management.simulate import make_qdf import pandas as pd from sklearn.linear_model import LinearRegression, LogisticRegression from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier cids = ["AUD", "CAD", "GBP", "USD"] xcats = ["XR", "CRY", "GROWTH", "INFL"] cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"] """Example: Unbalanced panel """ df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2002-01-01", "2020-12-31", 0, 1] df_cids.loc["CAD"] = ["2003-01-01", "2020-12-31", 0, 1] df_cids.loc["GBP"] = ["2000-01-01", "2020-12-31", 0, 1] df_cids.loc["USD"] = ["2000-01-01", "2020-12-31", 0, 1] df_xcats = pd.DataFrame(index=xcats, columns=cols) df_xcats.loc["XR"] = ["2000-01-01", "2020-12-31", 0.1, 1, 0, 0.3] df_xcats.loc["CRY"] = ["2000-01-01", "2020-12-31", 1, 2, 0.95, 1] df_xcats.loc["GROWTH"] = ["2000-01-01", "2020-12-31", 1, 2, 0.9, 1] df_xcats.loc["INFL"] = ["2000-01-01", "2020-12-31", -0.1, 2, 0.8, 0.3] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) dfd["grading"] = np.ones(dfd.shape[0]) black = { "GBP": ( pd.Timestamp(year=2009, month=1, day=1), pd.Timestamp(year=2012, month=6, day=30), ), "CAD": ( pd.Timestamp(year=2015, month=1, day=1), pd.Timestamp(year=2100, month=1, day=1), ), } train = msm.categories_df( df=dfd, xcats=xcats, cids=cids, val="value", blacklist=black, freq="M", lag=1 ).dropna() # Regressor X_train = train.drop(columns=["XR"]) y_train = train["XR"] vr = VotingRegressor( estimators = [ ("lr", LinearRegression()), ("rf", RandomForestRegressor(random_state = 42)) ], weights = [0.3, 0.7], ).fit(X_train, y_train) print(vr.weights) print(f"Voting regressor feature importances: {vr.feature_importances_}") # Classifier X_train = train.drop(columns=["XR"]) y_train = np.sign(train["XR"]) vr = VotingClassifier( estimators = [ ("lr", LogisticRegression()), ("rf", RandomForestClassifier()) ] ).fit(X_train, y_train) print(f"Voting classifier feature importances: {vr.feature_importances_}")