Source code for macrosynergy.learning.forecasting.linear_model.global_local

import numpy as np 
import pandas as pd

from sklearn.base import BaseEstimator, RegressorMixin
from scipy.optimize import minimize

[docs]class GlobalLocalRegression(BaseEstimator, RegressorMixin): r""" Linear panel model with hierarchical shrinkage of country-specific (local) coefficients towards unknown global coefficients. Learning means that both country-specific and global coefficients are estimated from data. Parameters ---------- local_lambda : float, default=1 Regularization strength to pull local coefficients towards global coefficients. global_lambda : float, default=1 Regularization strength to pull global coefficients towards zero. positive : bool, default=False Whether to constrain all coefficients to be positive. Default is False. fit_intercept : bool, default=True Whether to fit an intercept term. Default is True. min_xs_samples : int, default=36 Minimum number of samples required in each group for the group to be considered a contribution to the mean squared error component of the loss function. Notes ----- A panel can be modelled from a global perspective, where time series of all countries are "pooled" or stacked together, meaning that samples from different countries are treated as independent. This is called a pooled regression. With one model fit on all countries' data, this is a high-bias, low-variance model. Alternatively, country-by-country regressions can be fit, with a separate model for each country. This is low-bias but high-variance, since each model sees less data. This implies that a balance can be found between these two extremes by balancing this bias-variance trade-off. Introduction of bias to the country-by-country models can lead to a potentially substantial reduction in variance. Mathematically, this fit is found by minimizing the sum of squared residuals for each country, with a term that penalizes deviation of country-specific coefficients from a global coefficient. The global coefficient is also penalized to prevent it from growing too large. The loss function is as follows: .. math:: L(\{\beta_i\}_{i=1}^{C}, \beta) = \frac{1}{C} \sum_{i = 1}^{C} \left [ \frac{1}{n_{i}} \sum_{t=1}^{n_{i}} (y_{it} - x_{it}^{\intercal} \beta_{i})^2 \right ] + \lambda_{\text{local}} \sum_{i=1}^{C} ||\beta_i - \beta||_{2}^{2} + \lambda_{\text{global}} ||\beta||_{2}^{2} """ def __init__(self, local_lambda = 1, global_lambda = 1, positive = False, fit_intercept = True, min_xs_samples = 36): # Attributes self.local_lambda = local_lambda self.global_lambda = global_lambda self.positive = positive self.fit_intercept = fit_intercept self.min_xs_samples = min_xs_samples
[docs] def fit(self, X, y, sample_weight = None): """ Fit the global-local model. Parameters ---------- X : pd.DataFrame Input feature matrix, multi-indexed by `cid` and `real_date`. y : pd.DataFrame or pd.Series Target vector associated with each sample in X, multi-indexed by `cid` and `real_date`. sample_weight : np.ndarray, optional Sample weights for each sample in X. If provided, it should be a 1D array with the same length as the number of samples in X. If None, all samples are treated equally. Returns ------- self Fitted estimator. """ # Fit if self.fit_intercept: X = X.copy() X.insert(0, "intercept", 1) self.cids_ = sorted(X.index.get_level_values(0).unique()) self.n_cids_ = len(self.cids_) self.n_features_ = X.shape[1] self.X_cid_ = {} self.y_cid_ = {} self.Xy_cid_weights_ = None if sample_weight is None else {} for cid in self.cids_: self.X_cid_[cid] = X.loc[cid].values self.y_cid_[cid] = y.loc[cid].values if sample_weight is not None: self.Xy_cid_weights_[cid] = sample_weight[y.index.get_level_values(0)==cid] # Initialise with zeros x0 = np.zeros((self.n_cids_ + 1) * self.n_features_) # Optional bounds if self.fit_intercept: if self.positive: # Place positive restrictions on the feature weights not intercepts bounds = [] for _ in range(self.n_cids_ + 1): bounds.append((None, None)) bounds.extend([(0, None)] * (self.n_features_ - 1)) else: bounds = [(None, None)] * len(x0) else: if self.positive: bounds = [(0, None)] * len(x0) else: bounds = [(None, None)] * len(x0) result = minimize( fun=self.loss, x0=x0, method="L-BFGS-B", bounds=bounds, jac=self.loss_derivative, ) coefs = result.x.reshape(self.n_cids_ + 1, self.n_features_) self.cid_coefs_ = {cid: coefs[i] for i, cid in enumerate(self.cids_)} if self.fit_intercept: self.coef_ = coefs[-1, 1:] self.intercept_ = coefs[-1, 0] else: self.coef_ = coefs[-1] self.intercept_ = None return self
[docs] def loss(self, weights): """ Loss function for the global-local regression model. Parameters ---------- weights : np.ndarray Flattened array of weights, where the last `n_features_` elements correspond with the global coefficients and the rest correspond to the local coefficients for each country. """ weights = weights.reshape(self.n_cids_ + 1, self.n_features_) total_loss = 0.0 global_beta = weights[-1, :] # Likelihood for i, g in enumerate(self.cids_): X_g = self.X_cid_[g] y_g = self.y_cid_[g] preds = X_g @ weights[i] residuals = y_g - preds if self.Xy_cid_weights_ is not None: w_g = self.Xy_cid_weights_[g] total_loss += np.average(residuals**2, weights=w_g) else: total_loss += np.mean(residuals**2) # Local-to-global regularization term if self.local_lambda > 0: reg = np.sum((weights[:-1] - global_beta)**2) total_loss += self.local_lambda * reg # Global regularization term if self.global_lambda > 0: reg = np.sum(global_beta**2) total_loss += self.global_lambda * reg return total_loss
[docs] def loss_derivative(self, weights): """ Derivative of the loss function with respect to the weights. Parameters ---------- weights : np.ndarray Flattened array of weights, where the last `n_features_` elements correspond with the global coefficients and the rest correspond to the local coefficients for each country. """ weights = weights.reshape(self.n_cids_ + 1, self.n_features_) global_beta = weights[-1] grads = np.zeros_like(weights) for i, cid in enumerate(self.cids_): X_i = self.X_cid_[cid] y_i = self.y_cid_[cid] beta_i = weights[i] preds = X_i @ beta_i residual = preds - y_i if self.Xy_cid_weights_ is not None: w_i = self.Xy_cid_weights_[cid] grad_i = 2 * (X_i.T @ (residual * w_i)) / np.sum(w_i) else: grad_i = 2 * X_i.T @ residual / len(residual) grad_i += 2 * self.local_lambda * (beta_i - global_beta) grads[i] = grad_i # Gradient for global beta grad_g = 2 * self.local_lambda * np.sum(global_beta - weights[:-1], axis=0) + 2 * self.global_lambda * global_beta grads[-1] = grad_g return grads.flatten()
[docs] def predict(self, X): """ Predict the target values for the given input data. Parameters ---------- X : pd.DataFrame Input features for prediction. Returns ------- np.ndarray Predicted target values. """ if self.fit_intercept: X = X.copy() X.insert(0, "intercept", 1) group_indices = X.index.get_level_values(0) X_values = X.values preds = np.zeros(len(X)) for group in self.cids_: mask = group_indices == group if mask.any(): beta = self.cid_coefs_.get(group, np.zeros(self.n_features_)) preds[mask] = X_values[mask] @ beta return preds
if __name__ == "__main__": import macrosynergy.management as msm from macrosynergy.management.simulate import make_qdf cids = ["AUD", "CAD", "GBP", "USD"] xcats = ["XR", "CRY", "GROWTH", "INFL"] cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"] """Example: Unbalanced panel """ df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2002-01-01", "2020-12-31", 0, 1] df_cids.loc["CAD"] = ["2003-01-01", "2020-12-31", 0, 1] df_cids.loc["GBP"] = ["2000-01-01", "2020-12-31", 0, 1] df_cids.loc["USD"] = ["2000-01-01", "2020-12-31", 0, 1] df_xcats = pd.DataFrame(index=xcats, columns=cols) df_xcats.loc["XR"] = ["2000-01-01", "2020-12-31", 0.1, 1, 0, 0.3] df_xcats.loc["CRY"] = ["2000-01-01", "2020-12-31", 1, 2, 0.95, 1] df_xcats.loc["GROWTH"] = ["2000-01-01", "2020-12-31", 1, 2, 0.9, 1] df_xcats.loc["INFL"] = ["2000-01-01", "2020-12-31", -0.1, 2, 0.8, 0.3] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) dfd["grading"] = np.ones(dfd.shape[0]) black = { "GBP": ( pd.Timestamp(year=2009, month=1, day=1), pd.Timestamp(year=2012, month=6, day=30), ), "CAD": ( pd.Timestamp(year=2015, month=1, day=1), pd.Timestamp(year=2100, month=1, day=1), ), } train = msm.categories_df( df=dfd, xcats=xcats, cids=cids, val="value", blacklist=black, freq="M", lag=1 ).dropna() # Regressor X_train = train.drop(columns=["XR"]) y_train = np.sign(train["XR"]) glr = GlobalLocalRegression().fit(X_train, y_train) print(glr.predict(X_train))