import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.preprocessing import StandardScaler
from typing import Optional
import inspect
from macrosynergy.learning.forecasting.torch.samplers.timeseries_sampler import TimeSeriesSampler
from macrosynergy.learning.forecasting.torch.models.mlps import MultiLayerPerceptron
from copy import deepcopy
import numbers
[docs]class MLPRegressor(BaseEstimator, RegressorMixin):
"""
Scikit-learn compatible multi-layer perceptron, implemented in PyTorch.
Parameters
----------
n_latent : Union[int, List[int]], optional
Numer of hidden units in the latent layer(s) of the MLP.
If an integer is provided, the MLP will have a single hidden layer with n_latent
units. If a list of integers is provided, the MLP will have multiple hidden layers
with the number of units in each layer specified by the corresponding element in
the list. If provided, all (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation)
must be specified and torch_model must be None. Default is 32.
fit_encoder_intercept : bool, optional
Whether to include an intercept (bias term) in the encoder layers of the MLP.
If provided, all (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation)
must be specified and torch_model must be None. Default is True.
fit_head_intercept : bool, optional
Whether to include an intercept (bias term) in the output layer of the MLP.
If provided, all (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation)
must be specified and torch_model must be None.Default is True.
encoder_activation : str, optional
Activation function for the encoder (hidden) component of the network.
If provided, all (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation)
must be specified and torch_model must be None. Default is "relu".
Must be one of "tanh", "relu", or "sigmoid".
head_activation : str, optional
Activation function for the head (output) component of the network.
If provided, all (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation)
must be specified and torch_model must be None. Default is "identity". Must be one
of "tanh", "relu", "sigmoid", or "identity".
torch_model : Intersection[torch.nn.Module, BaseEstimator], optional
Custom PyTorch model to use instead of the default MLP. Must be a subclass of both
torch.nn.Module and sklearn.base.BaseEstimator. If torch_model is provided, all
parameters (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation)
must be None. Default is None.
loss_func : torch.nn.Module, optional
Loss function used during training. Must be a subclass of torch.nn.Module.
Default is nn.MSELoss().
optimizer : Union[str, List[str]], optional
Optimizer(s) used during training. If a single string is provided, it specifies the
optimizer used in backpropagation. If a list of strings is provided, each string
specifies an optimizer to be used in separate training runs, forming an neural
network ensemble. Currently supported optimizers are "AdamW", "SGD", and "SGD+mom".
Default is "AdamW".
scheduler : Optional[str], optional
Learning rate scheduler used during training. Currently supported schedulers are
"OneCycleLR" and None. Default is None.
batch_size : int, optional
Batch size used during training. Default is 32.
learning_rate : float, optional
Learning rate used by the optimizer. Default is 3e-4.
weight_decay : float, optional
Weight decay used by the optimizer. Default is 1e-4.
reg_turnover : float, optional
L2 regularization strength for the turnover in model outputs. Default is 0.
use_ts_sampler : bool, optional
Whether to use a time-series aware batch sampler during training.
Default is True.
aggregate_last : bool, optional
When using time-series batch sampling, whether or not to aggregate the last batch
into the previous batch if it is smaller than the specified batch size. When True,
`drop_last` must be False. Default is True.
drop_last : bool, optional
When using time-series batch sampling, whether or not to drop the last batch if it
is smaller than the specified batch size. When True, `aggregate_last` must be
False. Default is False.
epochs : int, optional
Maximum number of training epochs. Default is 10000.
patience : int, optional
Number of epochs to wait for improvement before early stopping. Default is 1000.
train_pct : float, optional
Fraction of samples used for training (remainder used for validation). This is
needed for the early stopping process. Default is 0.7.
x_scaler : Optional[TransformerMixin], optional
Scaler for the input features. Must be a subclass of sklearn's TransformerMixin.
This can also be set to None.
Default is StandardScaler(with_mean=False).
y_scaler : Optional[TransformerMixin], optional
Scaler for the target values. Must be a subclass of sklearn's TransformerMixin.
This can also be set to None.
Default is StandardScaler(with_mean=False).
verbose : bool, optional
Whether to print training diagnostics during training. Default is False.
random_state : Union[int, List[int]], optional
Random seed(s) used for PyTorch initialization and training. If multiple seeds
are spsecified, then a neural network ensemble will be trained with each seed
in the list. Default is 42.
inverse_transform_preds : bool, optional
Whether to inverse-transform predictions back to the original target scale using
the fitted target scaler. Default is False.
min_samples : int, optional
Minimum number of samples for an asset to have a head in the neural network.
Default is 36.
Notes
-----
A neural network is a parametric model that, given a collection of input features,
learns a mapping to target values by passing the feature set through "neurons", which
are themselves the composition of a linear transformation and a non-linear 'activation
function'. The output of these neurons should be interpreted as latent factors. These
neuron outputs can then be passed through further neurons, and so on, until the final
'layer' of neurons that produces the model predictions. The parameters of the linear
transformations are learned during training. This is the basic structure of a neural
network, with other types of neural network building upon this to handle sequential
data/images/videos more efficiently.
When the input dataset is tabular, with each sample consisting of a set of features
and a target value, and each treated as independent, then the model defined by mapping
the input features to a layer of latent factors via neurons, then (possibly) to
another layer of latent factors, and so on, until the final layer of neurons that maps
to the target value(s), is called a multi-layer perceptron (MLP).
Learning corresponds to estimating the optimal parameters of the neural network.
Optimality refers to the suitability of the parameters for the forecasting task at hand,
which is quantified by a loss function. `MLPRegressor` expects a PyTorch-compatible
loss function to be provided, which inherits from `torch.nn.Module` and has a `forward`
method that takes in the model predictions and the true target values and outputs
a scalar loss value. The default loss function is mean squared error. Practically
optimizing the parameters of this network is not trivial, because unlike an OLS model
(which optimizes mean squared error) the activation functions introduce non-linearity
in the model, which (firstly) means that no closed-form solution exists for optimal
parameters, and (secondly) means that the loss landscape is non-convex with many
local minima, saddle points and generically complicated geometry. The algorithm used
to train such a neural network is called 'backpropogation', which involves:
1. Randomly initializing the parameters of the network
2. Passing the input features through the network to get (initially rubbish) predictions
3. Calculating the loss of the predictions with respect to the true target values
using the specified loss function
4. Calculating the derivative of the loss with respect to each parameter in the network,
based on the data.
5. Updating the parameters in the direction that reduces the loss, with the step size
determined by the learning rate and the optimizer.
6. Iterating until convergence.
Traditionally, the optimizer used in step 5 was stochastic gradient descent (SGD),
which simply updates the parameters in the direction of the negative gradient of the loss.
If one imagines a ball rolling down a hill, to get the bottom the ball has to move in
the direction of the steepest descent, which is the negative gradient. The 'stochastic'
part means that data is provided to the network in batches, meaning that the gradient
calculation is noisy. This noise is helpful for optimization because it prevents
convergence to a poor minimum in the loss surface. In particular, SGD tends to converge
to flatter minima in the loss surface, which are associated with better generalization
performance. SGD, however, can be slow and other optimizers have been developed that
can converge faster, such as SGD + momentum, or AdamW.
The previous paragraph touches on the importance of the geometry of the loss surface
for optimization and generalization. For those who are new to the world of neural
networks, it likely seems that the goal is to optimize the parameters to achieve the
global minimum in the loss surface. This, however, is a bad idea. The global minimum
is very likely to memorise the training data and consequently generalise poorly. This
is because the neural network typically has a vast number of parameters. This means
that is in fact preferable to converge to a local minimum, particularly if we can
characterise certain local minima as being better than others. Indeed, we can; we prefer
flatter minima rather than steep minima. Intuitively, if we converge to a steep minimum,
then a small change in the underlying data leads us out of the minimum, indicating
that the model is unstable and likely to generalise poorly. On the other hand,
small changes in the data do not lead us out of a flat minimum, indicating that the
model is stable and likely to generalise better. Certain techniques can be employed
to encourage convergence to a flatter minimum, such as using a learning rate scheduler
that forces a large learning rate at periods of training, allowing the model to escape
steep minima, and reducing the learning rate when a favourable region of the parameter
space is being explored. Small batch sizes also encourage convergence to a flatter
minimum.
Convergence is also complicated by the fact that indefinite training of the network
leads to overfitting. Early stopping is a common regularization strategy for neural
network training. The idea is split a training set into a smaller training subset
and a validation subset. The model is trained on the training subset, but at the end
of each epoch (each complete pass of the training subset), it is evaluated against
the validation subset. If the validation loss does not improve for a certain number of
epochs, then training is stopped and the parameters from the epoch with the best
validation loss are returned.
In this implementation of a multilayer perceptron, the structure of the model is
determined either by setting (`n_latent`, `fit_encoder_intercept`, `fit_head_intercept`,
`encoder_activation`, `head_activation`) jointly or by providing a custom `torch_model`.
The loss function is determined by the `loss_func` parameter, and the training dynamics
are determined by the `optimizer`, `scheduler`, `batch_size`, `learning_rate`,
`weight_decay`, and `reg_turnover` parameters. Weight decay is a regularization strategy
that penalizes large weights in the network, whilst `reg_turnover` penalizes
large changes in model outputs from one time period to the next, which is useful
information when transaction cost data is incorporated in the loss function.
The usual theory for neural network training is centred around each sample within a
batch being independent and identically distributed, implying that the random variables
corresponding to the derivative of the loss, for a fixed set of parameters, evaluated
at each sample are independent and identically distributed. This means that the average
derivative over a batch is a consistent, unbiased estimate of the true
gradient of the loss with respect to the parameters. On time series data, however,
mixing samples from different time periods leads to can lead to biased gradient estimates
due to the presence of different regimes within a single batch, violating the assumption
of samples coming from the same distribution. This confuses the learning process
because the model is pulled in conflicting directions by samples drawn from different
regimes, resulting in a poorly performing learning algorithm. To remedy this, we have
provided the option to use a time series-aware batch sampler that ensures that each
batch is comprised of samples from contiguous time periods. This should help
convergence. This can be toggled on/off with the `use_ts_sampler` parameter.
Further work
------------
* Implement turnover regularization
* Custom optimizer and scheduler
* LARS and ReduceLROnPlateau
* Optional retraining after early stopping to avoid data waste
"""
def __init__(
self,
# Neural network structure
n_latent = 32,
fit_encoder_intercept = True,
fit_head_intercept = True,
encoder_activation = "relu",
head_activation = "identity",
dropout_p = 0,
torch_model = None,
# Neural network training dynamics
loss_func = torch.nn.MSELoss(),
optimizer: str = "AdamW", # TODO: Add lars and ability to pass in a custom optimizer.
scheduler: Optional[str] = None, # TODO: options for other schedulers, probably ReduceLRonPlateau, option for custom scheduler object
batch_size = 32,
learning_rate = 3e-4,
weight_decay = 1e-4,
reg_turnover = 0, # TODO: implement but this is only useful when transaction costs are included in the loss function, which is not currently the case
use_ts_sampler = True, # TODO: turn this into an optional sampler object
aggregate_last = True,
drop_last = False,
epochs = 10000, # NOTE: when a scheduler is used, the epochs default is way too high unless the patience is high
patience = 1000,
train_pct = 0.7,
x_scaler = StandardScaler(with_mean=False),
y_scaler = StandardScaler(with_mean=False),
# Other stuff
verbose = False,
random_state = 42,
inverse_transform_preds = False,
min_samples = 36
):
# Checks
self._check_init_params(
n_latent,
fit_encoder_intercept,
fit_head_intercept,
encoder_activation,
head_activation,
dropout_p,
torch_model,
loss_func,
optimizer,
scheduler,
batch_size,
learning_rate,
weight_decay,
reg_turnover,
use_ts_sampler,
aggregate_last,
drop_last,
epochs,
patience,
train_pct,
x_scaler,
y_scaler,
verbose,
random_state,
inverse_transform_preds,
min_samples,
)
# Attributes
self.n_latent = n_latent
self.fit_encoder_intercept = fit_encoder_intercept
self.fit_head_intercept = fit_head_intercept
self.encoder_activation = encoder_activation
self.head_activation = head_activation
self.dropout_p = dropout_p
self.torch_model = torch_model
self.loss_func = loss_func
self.optimizer = optimizer
self.scheduler = scheduler
self.batch_size = batch_size
self.learning_rate = learning_rate
self.weight_decay = weight_decay
self.reg_turnover = reg_turnover
self.use_ts_sampler = use_ts_sampler
self.aggregate_last = aggregate_last
self.drop_last = drop_last
self.epochs = epochs
self.patience = patience
self.train_pct = train_pct
self.x_scaler = x_scaler
self.y_scaler = y_scaler
self.verbose = verbose
self.random_state = random_state
self.inverse_transform_preds = inverse_transform_preds
self.min_samples = min_samples
self.optimizers = [self.optimizer] if not isinstance(self.optimizer, list) else self.optimizer
self.random_states = [self.random_state] if not isinstance(self.random_state, list) else self.random_state
[docs] def fit(self, X, y, sample_weight=None):
# Fit checks
self._check_fit_params(X, y, sample_weight)
# Copy data and initialize empty list of models to be trained
X = X.copy()
y = y.copy()
self.models = []
# Data checks
# TODO: if torch_model is provided, check it has the right structure
# to be trained by this class by passing a batch through it
sample_weight_strategy = self._check_fit_params(X, y, sample_weight)
# Filter assets with insufficient samples to have a head in the network
target_counts = y.count()
self.targets = target_counts[target_counts >= self.min_samples].index
self.n_targets = len(self.targets)
y = y[self.targets]
# Create training and validation splits
X_train, X_valid, y_train, y_valid = self.create_train_valid_splits(X, y, self.train_pct)
# Scale training and validation splits
X_train_s, X_valid_s, y_train_s, y_valid_s = self.scale_data(X_train, X_valid, y_train, y_valid, self.x_scaler, self.y_scaler)
# Make tensor datasets
train_dataset, valid_dataset = self.make_tensor_datasets(X_train_s, X_valid_s, y_train_s, y_valid_s, sample_weight)
# Iterate through random states
for optimizer in self.optimizers:
for random_state in self.random_states:
# Set seed
torch.manual_seed(random_state)
# Make torch dataloaders
train_loader, train_loader_eval, valid_loader = self.make_dataloaders(train_dataset, valid_dataset, self.batch_size, self.use_ts_sampler, self.aggregate_last, self.drop_last)
# Initialize model
if self.torch_model is not None:
# Reinitialise torch_model under the random seed
# TODO: check this will work upfront
params = self.torch_model.get_params(deep=False)
model = type(self.torch_model)(**params)
else:
model = self.initialize_model(
n_inputs = X.shape[1],
n_latent = self.n_latent,
n_outputs = y.shape[1],
encoder_activation = self.encoder_activation,
head_activation = self.head_activation,
fit_encoder_intercept = self.fit_encoder_intercept,
fit_head_intercept = self.fit_head_intercept,
dropout_p = self.dropout_p
)
# Set up optimizer
optim = self.make_optimizer(model, optimizer, self.learning_rate, self.weight_decay)
# Set up scheduler
if self.scheduler is not None:
scheduler = self.make_scheduler(optim, self.scheduler, self.epochs, len(train_loader))
else:
scheduler = None
# Train model
trained_model = self.train_model(
model = model,
train_loader = train_loader,
train_loader_eval = train_loader_eval,
valid_loader = valid_loader,
optimizer = optim,
scheduler = scheduler,
loss_func = self.loss_func,
sample_weight = sample_weight,
sample_weight_strategy = sample_weight_strategy,
#reg_turnover = self.reg_turnover,
patience = self.patience,
verbose = self.verbose
)
self.models.append(trained_model)
return self
[docs] def predict(self, X):
# Predict checks
self._check_predict_params(X)
# Scale data
X_s = self.x_scaler.transform(X)
model_preds = []
with torch.no_grad():
# Convert to tensor and pass through each network
X_s_torch = torch.Tensor(X_s)
for model in self.models:
model.eval()
preds = model(X_s_torch).numpy()
# Inverse scale predictions
if self.inverse_transform_preds:
preds = self.y_scaler.inverse_transform(preds)
model_preds.append(preds)
# Concatenate predictions and average across models
final_preds = np.mean(np.stack(model_preds, axis=0), axis = 0)
return pd.DataFrame(final_preds, index=X.index, columns=self.targets)
[docs] def initialize_model(
self,
n_inputs,
n_latent,
n_outputs,
encoder_activation,
head_activation,
fit_encoder_intercept,
fit_head_intercept,
dropout_p
):
model = MultiLayerPerceptron(
n_inputs=n_inputs,
n_latent=n_latent,
n_outputs=n_outputs,
encoder_activation=encoder_activation,
head_activation=head_activation,
fit_encoder_intercept=fit_encoder_intercept,
fit_head_intercept=fit_head_intercept,
dropout_p=dropout_p
)
return model
[docs] def create_train_valid_splits(self, X, y, train_pct):
dates = sorted(X.index.get_level_values(1).unique())
cut = int(train_pct * len(dates))
train_dates, valid_dates = dates[:cut], dates[cut:] # TODO: upfront check this doesn't create empty splits
X_train = X[X.index.get_level_values(1).isin(train_dates)]
y_train = y[y.index.get_level_values(1).isin(train_dates)]
X_valid = X[X.index.get_level_values(1).isin(valid_dates)]
y_valid = y[y.index.get_level_values(1).isin(valid_dates)]
return X_train, X_valid, y_train, y_valid
[docs] def scale_data(
self,
X_train,
X_valid,
y_train,
y_valid,
x_scaler,
y_scaler,
):
# Scale independent variables
if x_scaler:
x_scaler.fit(X_train)
X_train_s = x_scaler.transform(X_train)
X_valid_s = x_scaler.transform(X_valid)
else:
X_train_s = X_train.values
X_valid_s = X_valid.values
# Scale dependent variables
# TODO: ensure ys are 2d for this to work
if y_scaler:
y_scaler.fit(y_train)
y_train_s = y_scaler.transform(y_train)
y_valid_s = y_scaler.transform(y_valid)
else:
y_train_s = y_train.values
y_valid_s = y_valid.values
return X_train_s, X_valid_s, y_train_s, y_valid_s
[docs] def make_tensor_datasets(
self,
X_train_s,
X_valid_s,
y_train_s,
y_valid_s,
sample_weight,
):
if sample_weight is not None:
train_dataset = torch.utils.data.TensorDataset(torch.Tensor(X_train_s), torch.Tensor(y_train_s), torch.Tensor(sample_weight))
else:
train_dataset = torch.utils.data.TensorDataset(torch.Tensor(X_train_s), torch.Tensor(y_train_s))
valid_dataset = torch.utils.data.TensorDataset(torch.Tensor(X_valid_s), torch.Tensor(y_valid_s))
return train_dataset, valid_dataset
[docs] def make_dataloaders(
self,
train_dataset,
valid_dataset,
batch_size,
use_ts_sampler,
aggregate_last,
drop_last,
):
"""
TODO: run through aggregate last and drop last logic
"""
if not use_ts_sampler:
train_loader = torch.utils.data.DataLoader(
dataset = train_dataset,
batch_size = self.batch_size,
shuffle = True,
drop_last = drop_last
)
train_loader_eval = torch.utils.data.DataLoader(
dataset = train_dataset,
batch_size = self.batch_size,
shuffle = False,
drop_last = False,
)
else:
train_loader = torch.utils.data.DataLoader(
dataset = train_dataset,
batch_sampler = TimeSeriesSampler(
dataset=train_dataset,
batch_size=batch_size,
shuffle = True,
aggregate_last = aggregate_last,
drop_last = drop_last
)
)
train_loader_eval = torch.utils.data.DataLoader(
dataset = train_dataset,
batch_sampler = TimeSeriesSampler(
dataset=train_dataset,
batch_size=batch_size,
shuffle = False,
aggregate_last = aggregate_last,
drop_last = False
)
)
valid_loader = torch.utils.data.DataLoader(
dataset = valid_dataset,
batch_sampler=TimeSeriesSampler(
dataset=valid_dataset,
batch_size=self.batch_size,
shuffle=False,
aggregate_last = aggregate_last,
drop_last = False
),
)
return train_loader, train_loader_eval, valid_loader
[docs] def make_optimizer(self, model, optimizer_name, learning_rate, weight_decay):
if optimizer_name == "AdamW":
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
elif optimizer_name == "SGD":
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
elif optimizer_name == "SGD+mom":
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay, momentum = 0.9)
else:
# TODO: add LARS for large batch SGD training
# TODO: add ability to pass in an optimizer class inheriting from torch.optim.Optimizer
raise ValueError(f"Unsupported optimizer: {optimizer_name}")
return optimizer
[docs] def make_scheduler(self, optimizer, scheduler_name, epochs, steps_per_epoch):
if scheduler_name == "OneCycleLR":
scheduler = torch.optim.lr_scheduler.OneCycleLR(
optimizer,
max_lr=self.learning_rate,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
pct_start=0.3,
anneal_strategy='cos',
)
# elif scheduler_name == "ReduceLROnPlateau":
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
# optimizer,
# mode='min',
# factor=0.1,
# patience=int(self.patience / 2), # TODO: see what a reasonable default would be
# verbose=self.verbose
# )
else:
# TODO: add more schedulers later
raise ValueError(f"Unsupported scheduler: {scheduler_name}")
return scheduler
[docs] def train_model(
self,
model,
train_loader,
train_loader_eval,
valid_loader,
optimizer,
scheduler,
loss_func,
sample_weight,
sample_weight_strategy,
#reg_turnover,
patience,
verbose,
):
best_score = np.inf
best_state = None
counter = 0
for epoch in range(self.epochs):
model.train()
if sample_weight:
for X_i, y_i, sw_i in train_loader:
model = self._fit_one_batch(
model = model,
X_i = X_i,
y_i = y_i,
optimizer = optimizer,
scheduler = scheduler,
loss_func = loss_func,
sample_weight = sw_i,
sample_weight_strategy = sample_weight_strategy,
#reg_turnover = reg_turnover
)
else:
for X_i, y_i in train_loader:
model = self._fit_one_batch(
model = model,
X_i = X_i,
y_i = y_i,
optimizer = optimizer,
scheduler = scheduler,
loss_func = loss_func,
sample_weight = None,
sample_weight_strategy = sample_weight_strategy,
#reg_turnover = reg_turnover
)
train_loss = self._eval_loss(model, train_loader_eval, loss_func)
valid_loss = self._eval_loss(model, valid_loader, loss_func)
best_score, best_state, counter = self.update_es_stats(
model, train_loss, valid_loss, best_score, best_state, counter, patience
)
if counter >= patience:
break
if verbose and (epoch % 5 == 0 or epoch == self.epochs - 1):
print(f"Epoch {epoch + 1}: Train Loss = {train_loss:.4f}, Valid Loss = {valid_loss:.4f}, Best Valid Loss = {best_score:.4f}")
if best_state is not None:
model.load_state_dict(best_state)
else:
# TODO: handle this case later
pass
return model
def _fit_one_batch(
self,
model,
X_i,
y_i,
optimizer,
scheduler,
loss_func,
sample_weight,
sample_weight_strategy,
# reg_turnover
):
optimizer.zero_grad()
preds = model(X_i)
if not sample_weight:
loss = loss_func(preds, y_i)
elif sample_weight_strategy == "native":
loss = loss_func(preds, y_i, sample_weight)
elif sample_weight_strategy == "reduction_none":
loss = loss_func(preds, y_i) * sample_weight
loss = loss.mean()
# if reg_turnover > 0:
# pweight_levels = preds[1:] - preds[:-1]
# pweight_l1 = torch.mean(torch.abs(pweight_levels))
# loss = loss + reg_turnover * pweight_l1
loss.backward()
optimizer.step()
if scheduler is not None:
scheduler.step()
return model
def _eval_loss(self, model, loader, loss_func):
model.eval()
total_loss = 0.0
with torch.no_grad():
for X_i, y_i in loader:
preds = model(X_i)
total_loss += loss_func(preds, y_i).item()
avg_loss = total_loss / len(loader)
return avg_loss
[docs] def update_es_stats(self, model, train_loss, valid_loss, best_score, best_state, counter, patience):
if valid_loss < best_score:
best_score = valid_loss
best_state = deepcopy(model.state_dict())
counter = 0
else:
counter += 1
return best_score, best_state, counter
def _check_predict_params(
self,
X
):
# X
if not isinstance(X, pd.DataFrame):
raise TypeError("X must be a pandas DataFrame.")
if not isinstance(X.index, pd.MultiIndex):
raise ValueError("X must be multi-indexed.")
if not X.index.get_level_values(0).dtype == "object":
raise TypeError("The outer index of X must be strings.")
if not X.index.get_level_values(1).dtype == "datetime64[ns]":
raise TypeError("The inner index of X must be datetime.date.")
if not X.apply(lambda x: pd.api.types.is_numeric_dtype(x)).all():
raise TypeError("All columns in X must be numeric.")
if X.isnull().values.any():
raise ValueError("X must not contain missing values.")
def _check_init_params(
self,
n_latent,
fit_encoder_intercept,
fit_head_intercept,
encoder_activation,
head_activation,
dropout_p,
torch_model,
loss_func,
optimizer,
scheduler,
batch_size,
learning_rate,
weight_decay,
reg_turnover,
use_ts_sampler,
aggregate_last,
drop_last,
epochs,
patience,
train_pct,
x_scaler,
y_scaler,
verbose,
random_state,
inverse_transform_preds,
min_samples,
):
# First check either torch_model is set or (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation) are set.
if torch_model is None:
if n_latent is None or fit_encoder_intercept is None or fit_head_intercept is None or encoder_activation is None or head_activation is None or dropout_p is None:
raise ValueError(
"When torch_model is not provided, (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation, dropout_p) must all be specified."
)
else:
if n_latent is not None or fit_encoder_intercept is not None or fit_head_intercept is not None or encoder_activation is not None or head_activation is not None or dropout_p is not None:
raise ValueError(
"When torch_model is provided, (n_latent, fit_encoder_intercept, fit_head_intercept, encoder_activation, head_activation, dropout_p) should be set to None."
)
if torch_model is None:
# n_latent
if not isinstance(n_latent, numbers.Integral):
if not isinstance(n_latent, list):
raise TypeError("n_latent must be either an integer or a list of integers.")
if not all(isinstance(x, numbers.Integral) for x in n_latent):
raise TypeError("When n_latent is a list, all elements must be integers.")
if len(n_latent) <= 1:
raise ValueError("When n_latent is a list, it must contain more than one element.")
if not all(x >= 1 for x in n_latent):
raise ValueError("When n_latent is a list, all elements must be at least 1.")
else:
if n_latent < 1:
raise ValueError("When n_latent is an integer, it must be at least 1.")
# fit_encoder_intercept
if not isinstance(fit_encoder_intercept, bool):
raise TypeError("fit_encoder_intercept must be a boolean.")
# fit_head_intercept
if not isinstance(fit_head_intercept, bool):
raise TypeError("fit_head_intercept must be a boolean.")
# encoder_activation
if not isinstance(encoder_activation, str):
raise TypeError("encoder_activation must be a string.")
if encoder_activation not in {"tanh", "relu", "sigmoid"}:
raise ValueError(
"encoder_activation must be one of 'tanh', 'relu', or 'sigmoid'."
)
# head_activation
if not isinstance(head_activation, str):
raise TypeError("head_activation must be a string.")
if head_activation not in {"tanh", "relu", "sigmoid", "identity"}:
raise ValueError(
"head_activation must be one of 'tanh', 'relu', 'sigmoid', or 'identity'."
)
# dropout_p
if not isinstance(dropout_p, numbers.Real):
raise TypeError("dropout_p must be a real number.")
if not (0 <= dropout_p < 0.5):
raise ValueError("dropout_p must be between 0 and 0.5.")
# torch_model
if torch_model is not None:
if not isinstance(torch_model, nn.Module):
raise TypeError("torch_model must be an instance of torch.nn.Module or None.")
if not isinstance(torch_model, BaseEstimator):
raise TypeError(
"torch_model must be an instance of a class inheriting from" \
"sklearn.base.BaseEstimator. This is to allow for cross-validation" \
"on model hyperparameters in a `scikit-learn` framework."
)
# it needs a forward method
if type(torch_model).forward is nn.Module.forward:
raise ValueError("torch_model must have a forward method.")
# loss_func
if not isinstance(loss_func, nn.Module):
raise TypeError("loss_func must inherit from nn.Module.")
try:
test_loss = loss_func(torch.rand(16,1), torch.rand(16,1))
except Exception as e:
raise ValueError(f"loss_func must be callable with signature loss_func(preds, targets). Error encountered when testing random preds and targets of batch size 16 and single outputs: {e}")
# optimizer
if not isinstance(optimizer, str):
if not isinstance(optimizer, list):
raise TypeError("optimizer must be either a string or a list of strings.")
else:
if len(optimizer) <= 1:
raise ValueError("When optimizer is a list, it must contain more than one element.")
if not all(isinstance(x, str) for x in optimizer):
raise TypeError("When optimizer is a list, all elements must be strings.")
if not all(x in {"AdamW", "SGD", "SGD+mom"} for x in optimizer):
raise ValueError("When optimizer is a list, all elements must be one of 'AdamW', 'SGD', or 'SGD+mom'.")
else:
if optimizer not in {"AdamW", "SGD", "SGD+mom"}:
raise ValueError("optimizer must be one of 'AdamW', 'SGD', or 'SGD+mom'.")
# scheduler
if scheduler is not None:
if not isinstance(scheduler, str):
raise TypeError("scheduler must be a string or None.")
if scheduler not in {"OneCycleLR"}:
raise ValueError("scheduler must be one of 'OneCycleLR' or None.")
# batch_size
if not isinstance(batch_size, numbers.Integral):
raise TypeError("batch_size must be an integer.")
if batch_size < 1:
raise ValueError("batch_size must be at least 1.")
# learning_rate
if not isinstance(learning_rate, numbers.Real):
raise TypeError("learning_rate must be a real number.")
if learning_rate <= 0:
raise ValueError("learning_rate must be positive.")
# weight_decay
if not isinstance(weight_decay, numbers.Real):
raise TypeError("weight_decay must be a real number.")
if weight_decay < 0:
raise ValueError("weight_decay must be non-negative.")
# reg_turnover
if not isinstance(reg_turnover, numbers.Real):
raise TypeError("reg_turnover must be a real number.")
if reg_turnover < 0:
raise ValueError("reg_turnover must be non-negative.")
# use_ts_sampler
if not isinstance(use_ts_sampler, bool):
raise TypeError("use_ts_sampler must be a boolean.")
# aggregate_last
if not isinstance(aggregate_last, bool):
raise TypeError("aggregate_last must be a boolean.")
# drop_last
if not isinstance(drop_last, bool):
raise TypeError("drop_last must be a boolean.")
if aggregate_last and drop_last:
raise ValueError("aggregate_last and drop_last cannot both be True.")
# epochs
if not isinstance(epochs, numbers.Integral):
raise TypeError("epochs must be an integer.")
if epochs < 1:
raise ValueError("epochs must be at least 1.")
# patience
if not isinstance(patience, numbers.Integral):
raise TypeError("patience must be an integer.")
if patience < 1:
raise ValueError("patience must be at least 1.")
# train_pct
if not isinstance(train_pct, numbers.Real):
raise TypeError("train_pct must be a real number.")
if not (0 < train_pct < 1):
raise ValueError("train_pct must be between 0 and 1.")
# x_scaler
if x_scaler is not None:
if not isinstance(x_scaler, StandardScaler):
raise TypeError("x_scaler must be an instance of StandardScaler or None.")
# y_scaler
if y_scaler is not None:
if not isinstance(y_scaler, StandardScaler):
raise TypeError("y_scaler must be an instance of StandardScaler or None.")
# verbose
if not isinstance(verbose, bool):
raise TypeError("verbose must be a boolean.")
# random_state
if not isinstance(random_state, numbers.Integral):
if not isinstance(random_state, list):
raise TypeError("random_state must be either an integer or a list of integers.")
else:
if not all(isinstance(x, numbers.Integral) for x in random_state):
raise TypeError("When random_state is a list, all elements must be integers.")
if not all(x >= 0 for x in random_state):
raise ValueError("When random_state is a list, all elements must be non-negative.")
if len(random_state) <= 1:
raise ValueError("When random_state is a list, it must contain more than one element.")
else:
if random_state < 0:
raise ValueError("When random_state is an integer, it must be non-negative.")
# inverse_transform_preds
if not isinstance(inverse_transform_preds, bool):
raise TypeError("inverse_transform_preds must be a boolean.")
# min_samples
if not isinstance(min_samples, numbers.Integral):
raise TypeError("min_samples must be an integer.")
if min_samples < 1:
raise ValueError("min_samples must be at least 1.")
def _check_fit_params(self, X, y, sample_weight):
# X
if not isinstance(X, pd.DataFrame):
raise TypeError("X must be a pandas DataFrame.")
if not isinstance(X.index, pd.MultiIndex):
raise ValueError("X must be multi-indexed.")
if not X.index.get_level_values(0).dtype == "object":
raise TypeError("The outer index of X must be strings.")
if not X.index.get_level_values(1).dtype == "datetime64[ns]":
raise TypeError("The inner index of X must be datetime.date.")
if not X.apply(lambda x: pd.api.types.is_numeric_dtype(x)).all():
raise TypeError("All columns in X must be numeric.")
if X.isnull().values.any():
raise ValueError("X must not contain missing values.")
# y
if not (isinstance(y, pd.Series) or isinstance(y, pd.DataFrame)):
raise TypeError(
"y must be a pandas Series or DataFrame."
)
if not isinstance(y.index, pd.MultiIndex):
raise ValueError("y must be multi-indexed.")
if not y.index.get_level_values(0).dtype == "object":
raise TypeError("The outer index of y must be strings.")
if not y.index.get_level_values(1).dtype == "datetime64[ns]":
raise TypeError("The inner index of y must be datetime.date.")
if not X.index.equals(y.index):
raise ValueError("X and y must have the same multi-index.")
# sample_weight
if sample_weight is not None:
if not isinstance(sample_weight, np.ndarray):
raise TypeError("sample_weight must be a numpy array or None.")
if not all(isinstance(x, numbers.Real) for x in sample_weight):
raise TypeError("All elements in sample_weight must be real numbers.")
if not all(x >= 0 for x in sample_weight):
raise ValueError("All elements in sample_weight must be non-negative.")
if sample_weight.ndim != 1:
raise ValueError("sample_weight must be a 1D array.")
if len(sample_weight) != len(X):
raise ValueError("Length of sample_weight must match number of samples in X.")
# Check compatibility with loss function
sig_forward = inspect.signature(self.loss_func.forward)
sig_constructor = inspect.signature(self.loss_func.__init__)
if "sample_weight" not in sig_forward.parameters:
if "reduction" not in sig_constructor.parameters:
raise ValueError(
"Sample weights are not supported by the specified loss function. The loss function must either accept a `sample_weight` tensor in its forward method or have a `reduction` parameter in its constructor."
)
else:
reduction = sig_constructor.parameters["reduction"]
if reduction.default == "none":
return "reduction_none"
else:
raise ValueError(
"When the loss function does not accept a `sample_weight` tensor in its forward method but has a `reduction` parameter in its constructor, the `reduction` must be set to 'none' to support sample weights."
)
else:
return "native"
else:
return None
if __name__ == "__main__":
from macrosynergy.learning import (
SignalOptimizer,
)
from macrosynergy.management.simulate import make_qdf
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator
cids = ["AUD", "CAD", "GBP", "USD"]
xcats = ["XR1", "CRY", "GROWTH", "RATES", "XR2"]
cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"]
df_cids = pd.DataFrame(
index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"]
)
df_cids.loc["AUD"] = ["2012-01-01", "2020-12-31", 0, 1]
df_cids.loc["CAD"] = ["2012-01-01", "2020-12-31", 0, 1]
df_cids.loc["GBP"] = ["2012-01-01", "2020-12-31", 0, 1]
df_cids.loc["USD"] = ["2012-01-01", "2020-12-31", 0, 1]
df_xcats = pd.DataFrame(index=xcats, columns=cols)
df_xcats.loc["XR1"] = ["2012-01-01", "2020-12-31", 0.1, 1, 0, 0.3]
df_xcats.loc["CRY"] = ["2012-01-01", "2020-12-31", 1, 2, 0.95, 1]
df_xcats.loc["GROWTH"] = ["2012-01-01", "2020-12-31", 1, 2, 0.9, 1]
df_xcats.loc["RATES"] = ["2010-01-01", "2020-12-31", 0, 1, 0.5, 0.5]
df_xcats.loc["XR2"] = ["2020-01-01", "2020-12-31", -0.1, 2, 0.8, 0.3]
dfd = make_qdf(df_cids, df_xcats, back_ar=0.75, seed = 42)
dfd["grading"] = np.ones(dfd.shape[0])
black = {
"GBP": (
pd.Timestamp(year=2009, month=1, day=1),
pd.Timestamp(year=2012, month=6, day=30),
),
"CAD": (
pd.Timestamp(year=2015, month=1, day=1),
pd.Timestamp(year=2016, month=1, day=1),
),
}
so = SignalOptimizer(
df=dfd,
xcats=["CRY", "GROWTH", "RATES", "XR1", "XR2"],
cids=["USD"],
blacklist=black,
drop_nas="X",
n_targets=2,
)
X = so.X.copy(deep=True)
y = so.y.copy(deep=True)
class BasicMLP(nn.Module, BaseEstimator):
def __init__(self, n_inputs, n_latent, n_outputs, dropout=0.1):
super().__init__()
self.n_inputs = n_inputs
self.n_latent = n_latent
self.n_outputs = n_outputs
self.dropout = dropout
self.encoder = nn.Linear(n_inputs, n_latent)
self.dropout_layer = nn.Dropout(dropout)
self.head = nn.Linear(n_latent, n_outputs)
def forward(self, x):
z = torch.tanh(self.encoder(x))
z = self.dropout_layer(z)
out = self.head(z)
return out
mlp = MLPRegressor(
n_latent = 2,
fit_encoder_intercept = False,
fit_head_intercept = True,
encoder_activation = "tanh",
head_activation="identity",
dropout_p = 0.1,
#torch_model = BasicMLP(n_inputs=X.shape[1], n_latent=16, n_outputs=y.shape[1]),
loss_func=torch.nn.MSELoss(),
optimizer = ["AdamW","SGD+mom"],
scheduler = None,
batch_size = 16,
learning_rate = 3e-4,
weight_decay = 1e-4,
reg_turnover = 0,
use_ts_sampler = True,
aggregate_last=True,
drop_last=False,
epochs = 10000,
patience = 10,
train_pct = 0.7,
x_scaler = StandardScaler(with_mean=False),
y_scaler = StandardScaler(with_mean=False),
verbose = False,
random_state = [42,43],
inverse_transform_preds = True,
min_samples = 36,
)#.fit(X,y)
so.calculate_predictions(
name = "MLP",
models = {
"MLP": mlp
},
multi_target_fill="mean",
min_cids = 1,
min_xcats = 1,
min_periods = 36,
)
dfa = so.get_optimized_signals()
print(dfa)
print(list(mlp.models[0].parameters()))
print(list(mlp.models[1].parameters()))
preds = mlp.predict(X)
print(preds)