Source code for macrosynergy.learning.forecasting.linear_model.sur

import numpy as np
import pandas as pd
import numbers 

from sklearn.linear_model import LinearRegression
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.feature_selection import SelectorMixin

from scipy.optimize import minimize

[docs]class LinearMultiTargetRegression(BaseEstimator, RegressorMixin): """ Linear regression model with multiple targets, supporting seemingly unrelated regression (SUR) via feasible generalized least squares (FGLS). Parameters ---------- fit_intercept : bool, default=True Whether to include an intercept term in the regression. seemingly_unrelated : bool, default=False Whether to make the regression seemingly unrelated. covariance_estimator : Union[str, BaseEstimator], default="ewm" Choice of covariance estimator. Options are "ml" for maximum likelihood, "ewm" for exponentially weighted moving covariance, or a custom `scikit-learn` compatible covariance estimator. span : int, default=60 Span parameter for exponentially weighted covariance estimation of residuals. feature_selection : object, default=None A feature selection object inheriting from scikit-learn's `SelectorMixin` base class in `sklearn.feature_selection`. If provided, feature selection is applied per target before fitting. """ def __init__( self, fit_intercept=True, seemingly_unrelated=False, covariance_estimator = "ewm", span=60, feature_selection=None, ): # Checks if not isinstance(fit_intercept, bool): raise TypeError("The 'fit_intercept' parameter must be a boolean.") if not isinstance(seemingly_unrelated, bool): raise TypeError("The 'seemingly_unrelated' parameter must be a boolean.") if not isinstance(covariance_estimator, (str, BaseEstimator)): raise TypeError("The 'covariance_estimator' parameter must be a string or a BaseEstimator instance.") if isinstance(covariance_estimator, str) and covariance_estimator not in ["ml", "ewm"]: raise ValueError("If `covariance_estimator` is a string, it must be either 'ml' or 'ewm'.") # TODO: add checks for custom covariance estimator when inheriting from BaseEstimator if covariance_estimator == "ewm": if not isinstance(span, numbers.Integral): raise TypeError("The 'span' parameter must be an integer.") if span <= 0: raise ValueError("The 'span' parameter must be positive.") if feature_selection is not None and not isinstance( feature_selection, SelectorMixin ): raise TypeError( "The 'feature_selection' parameter must be a SelectorMixin instance." ) # Attributes self.fit_intercept = fit_intercept self.seemingly_unrelated = seemingly_unrelated self.covariance_estimator = covariance_estimator self.span = span self.feature_selection = feature_selection
[docs] def fit(self, X, y, sample_weight=None): """ Fit the linear multi-target regression model. Parameters ---------- X : pd.DataFrame Feature matrix of shape (n_samples, n_features). Should be multi-indexed by asset and real date. y : pd.DataFrame Target matrix of shape (n_samples, n_assets). Should be multi-indexed by asset and real date. sample_weight : array-like of shape (n_samples,), default=None Individual weights for each sample. """ # Checks self._check_fit_params(X, y, sample_weight) # For now, only support pandas dataframes with no missing values if isinstance(y, pd.Series): y = y.to_frame() assert isinstance(y, pd.DataFrame) assert y.isna().sum().sum() == 0 # Store data and metadata X = X.copy() y = y.copy() if self.fit_intercept: X.insert(0, "intercept", 1) self.assets = list(y.columns) self.n_assets = len(self.assets) self.n_samples = y.shape[ 0 ] self.features_ = list(X.columns) self.n_features = X.shape[1] # Store "initial" coefficients for FGLS # When seemingly_unrelated = False, these are final OLS coefficients # Separate logic when feature selection is applied self.X_features = {} self.initial_coefs = {} if self.feature_selection is not None: col_index = {col: i for i, col in enumerate(X.columns)} # includes an intercept if fit_intercept=True for asset in self.assets: # Store selected features indices per asset if self.fit_intercept: selector = self.feature_selection.fit(X.iloc[:, 1:], y[asset]) feats = selector.get_feature_names_out() cols = [0] + [col_index[f] for f in feats] else: selector = self.feature_selection.fit(X, y[asset]) feats = selector.get_feature_names_out() cols = [col_index[f] for f in feats] self.X_features[asset] = cols # Fit OLS on selected features and store them lr = LinearRegression(fit_intercept=False) lr.fit(X.iloc[:, cols], y[asset]) self.initial_coefs[asset] = lr.coef_ else: # Simply use all features for all assets cols = list(range(self.n_features)) self.X_features = {asset: cols for asset in self.assets} # Fit OLS jointly and store coefficients lr = LinearRegression(fit_intercept=False).fit(X, y) W = lr.coef_.T # shape (n_features × n_assets) self.initial_coefs = { asset: W[:, idx] for idx, asset in enumerate(self.assets) } # If not sur, job is done here # Just store OLS coefficients if not self.seemingly_unrelated: self.coefs_ = {} self.intercepts_ = {} for asset in self.assets: asset_coefs = self.initial_coefs[asset] if self.fit_intercept: self.intercepts_[asset] = asset_coefs[0] self.coefs_[asset] = asset_coefs[1:] else: self.intercepts_[asset] = 0 self.coefs_[asset] = asset_coefs return self # If sur, calculate covariance of residuals resids = pd.DataFrame( data=np.column_stack( [ y[asset].to_numpy() - X.iloc[:, self.X_features[asset]].to_numpy() @ self.initial_coefs[asset] for asset in self.assets ] ), index=y.index, columns=y.columns, ) # Estimate covariance matrix if self.covariance_estimator == "ewm": weights = np.array( [(1 - 2 / (self.span + 1)) ** i for i in range(len(y))][::-1] ) cov = np.cov(resids.values.T, aweights=weights) elif self.covariance_estimator == "ml": cov = np.cov(resids.values.T) else: self.covariance_estimator.fit(resids) cov = self.covariance_estimator.covariance_ # Invert matrix if isinstance(self.covariance_estimator, BaseEstimator) and hasattr( self.covariance_estimator, "precision_" ): invcov = self.covariance_estimator.precision_ else: invcov = np.linalg.inv(cov) # FGLS optimization # Due to feature selection, create a full matrix but pack/unpack the matrix # only to update trainable coefficients. W_full = np.zeros((self.n_features, self.n_assets)) for idx, asset in enumerate(self.assets): W_full[self.X_features[asset], idx] = self.initial_coefs[asset] # Initialize with OLS coefficients # Mask mask = np.zeros_like(W_full, dtype=bool) # matrix of Falses for idx, asset in enumerate(self.assets): mask[self.X_features[asset], idx] = True # Flatten W for scipy optimize x0 = W_full[mask] # Minimise SUR_SELECT loss over the panel if sample_weight is not None: sample_weight = np.asarray(sample_weight).reshape(-1) self.coefs_, self.intercepts_ = self.__minimise_sur_loss( X=X.to_numpy(), y=y.to_numpy(), invcov=invcov, x0=x0, mask=mask, sample_weight=sample_weight, ) return self
def __minimise_sur_loss(self, X, y, invcov, x0, mask, sample_weight): """ Fit the SUR model via minimization of the SUR loss function. Coefficients and intercepts are extracted after optimization. Parameters ---------- X : np.ndarray Feature matrix of shape (n_samples, n_features). y : np.ndarray Target matrix of shape (n_samples, n_assets). invcov : np.ndarray Inverse of the covariance matrix of residuals. x0 : np.ndarray Initial coefficients for optimization. mask : np.ndarray Boolean mask of shape (n_features, n_assets) indicating which coefficients are trainable. sample_weight : np.ndarray or None Optional sample weights for the loss function. """ # Run LBFGS res = minimize( fun=lambda params: self.__sur_loss_and_grad( params, X, y, invcov, mask, sample_weight, return_grad=False ), x0=x0, jac=lambda params: self.__sur_loss_and_grad( params, X, y, invcov, mask, sample_weight, return_grad=True ), method="L-BFGS-B", ) # Reshape into full matrix W = np.zeros((self.n_features, self.n_assets)) W[mask] = res.x # Extract intercepts and coefs # Convert to asset specific intercepts and coefficients if self.fit_intercept: coefs_ = { asset: W[self.X_features[asset][1:], idx] for idx, asset in enumerate(self.assets) } intercepts_ = {asset: W[0, idx] for idx, asset in enumerate(self.assets)} else: coefs_ = { asset: W[self.X_features[asset], idx] for idx, asset in enumerate(self.assets) } intercepts_ = {asset: 0 for asset in self.assets} return coefs_, intercepts_ def __sur_loss_and_grad( self, params, X, y, invcov, mask, sample_weight, return_grad ): """ SUR loss and derivative evaluation. Parameters ---------- params : np.ndarray Flattened array of trainable coefficients. X : np.ndarray Feature matrix of shape (n_samples, n_features). y : np.ndarray Target matrix of shape (n_samples, n_assets). invcov : np.ndarray Inverse of the covariance matrix of residuals. mask : np.ndarray Boolean mask of shape (n_features, n_assets) indicating which coefficients are trainable. sample_weight : np.ndarray or None Optional sample weights for the loss function. return_grad : bool Whether to return the gradient instead of the loss. """ # First unpack coefficients W = np.zeros((self.n_features, self.n_assets)) W[mask] = params # Residuals resids = y - X @ W if not return_grad: if sample_weight is None: loss = np.einsum("ti,ij,tj->", resids, invcov, resids) / self.n_samples else: loss = ( np.einsum("ti,ij,tj,t->", resids, invcov, resids, sample_weight) / self.n_samples ) return loss else: wresids = resids @ invcov if sample_weight is not None: wresids = wresids * sample_weight[:, None] grad_full = -2 * (X.T @ wresids) / self.n_samples grad = grad_full[mask] return grad
[docs] def predict(self, X): """ Predict method to return predictions for each asset. Parameters ---------- X : pd.DataFrame Feature matrix of shape (n_samples, n_features). Should be multi-indexed by asset and real date. """ # Checks self._check_predict_params(X) # Add intercept and set up predictions dataframe X = X.copy() if self.fit_intercept: X.insert(0, "intercept", 1) preds = pd.DataFrame(index=X.index, columns=self.assets) # Evaluate for each asset with asset specific coefficients. for asset in self.assets: coefs = self.coefs_[asset] if self.fit_intercept: intercept = self.intercepts_[asset] preds[asset] = ( X.iloc[:, self.X_features[asset][1:]].to_numpy() @ coefs + intercept ) else: preds[asset] = X.iloc[:, self.X_features[asset]].to_numpy() @ coefs return preds.values
def _check_fit_params(self, X, y, sample_weight): # Type checks for X and y if not isinstance(X, pd.DataFrame): raise TypeError("X must be a pandas DataFrame.") if not isinstance(y, (pd.DataFrame, pd.Series)): raise TypeError("y must be a pandas DataFrame or Series.") # The dataframe must be multi indexed by asset and real date if not isinstance(X.index, pd.MultiIndex): raise ValueError("X must be multi-indexed by asset and real date.") if not isinstance(y.index, pd.MultiIndex): raise ValueError("y must be multi-indexed by asset and real date.") if not X.index.equals(y.index): raise ValueError("X and y must have the same multi-index.") # This model can't handle NAs. if X.isna().sum().sum() > 0: raise ValueError("X must not contain missing values.") if y.isna().sum().sum() > 0: raise ValueError("y must not contain missing values.") # Ensure shapes align if X.shape[0] != y.shape[0]: raise ValueError("X and y must have the same number of samples.") # If sample weight is set, make sure its shape and type are correct if sample_weight is not None: if not isinstance(sample_weight, (np.ndarray, list)): raise TypeError("sample_weight must be a numpy array or list.") if len(sample_weight) != X.shape[0]: raise ValueError("sample_weight must have the same number of samples as X and y.") for weight in sample_weight: if not isinstance(weight, numbers.Number): raise TypeError("All entries in sample_weight must be numeric.") if weight < 0: raise ValueError("All entries in sample_weight must be non-negative.") def _check_predict_params(self, X): # Type checks for X if not isinstance(X, pd.DataFrame): raise TypeError("X must be a pandas DataFrame.") # The dataframe must be multi indexed by asset and real date if not isinstance(X.index, pd.MultiIndex): raise ValueError("X must be multi-indexed by asset and real date.") # This model can't handle NAs. if X.isna().sum().sum() > 0: raise ValueError("X must not contain missing values.") # Ensure the features align with those seen in training if self.fit_intercept: if X.shape[1] != self.n_features - 1: raise ValueError("X must have the same number of features as during training.") if list(X.columns) != list(self.features_[1:]): raise ValueError("X must have the same feature columns as during training.") else: if X.shape[1] != self.n_features: raise ValueError("X must have the same number of features as during training.") if list(X.columns) != list(self.features_): raise ValueError("X must have the same feature columns as during training.")
if __name__ == "__main__": from macrosynergy.learning import ( SignalOptimizer, LarsSelector, ) from macrosynergy.management.simulate import make_qdf cids = ["AUD", "CAD", "GBP", "USD"] xcats = ["XR1", "CRY", "GROWTH", "RATES", "XR2"] cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"] df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2012-01-01", "2020-12-31", 0, 1] df_cids.loc["CAD"] = ["2012-01-01", "2020-12-31", 0, 1] df_cids.loc["GBP"] = ["2012-01-01", "2020-12-31", 0, 1] df_cids.loc["USD"] = ["2012-01-01", "2020-12-31", 0, 1] df_xcats = pd.DataFrame(index=xcats, columns=cols) df_xcats.loc["XR1"] = ["2012-01-01", "2020-12-31", 0.1, 1, 0, 0.3] df_xcats.loc["CRY"] = ["2012-01-01", "2020-12-31", 1, 2, 0.95, 1] df_xcats.loc["GROWTH"] = ["2012-01-01", "2020-12-31", 1, 2, 0.9, 1] df_xcats.loc["RATES"] = ["2010-01-01", "2020-12-31", 0, 1, 0.5, 0.5] df_xcats.loc["XR2"] = ["2015-01-01", "2020-12-31", -0.1, 2, 0.8, 0.3] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) dfd["grading"] = np.ones(dfd.shape[0]) black = { "GBP": ( pd.Timestamp(year=2009, month=1, day=1), pd.Timestamp(year=2012, month=6, day=30), ), "CAD": ( pd.Timestamp(year=2015, month=1, day=1), pd.Timestamp(year=2016, month=1, day=1), ), } so = SignalOptimizer( df=dfd, xcats=["CRY", "GROWTH", "RATES", "XR1", "XR2"], cids=cids, blacklist=black, drop_nas=True, n_targets=2, ) X = so.X.copy(deep=True) y = so.y.copy(deep=True) model = LinearMultiTargetRegression( seemingly_unrelated=True, fit_intercept=False, feature_selection=LarsSelector(n_factors=2), ) model.fit(X, y) print(model.predict(X))