Source code for macrosynergy.management.simulate.simulate_quantamental_data

"""
Module with functionality for generating mock quantamental data for testing purposes.
"""

import numpy as np
import pandas as pd
from statsmodels.tsa.arima_process import ArmaProcess
from typing import List, Dict, Union, Optional
from collections import defaultdict
import datetime
import warnings
from macrosynergy.management.types import QuantamentalDataFrame
from macrosynergy.management.utils import (
    ticker_df_to_qdf,
    is_valid_iso_date,
    get_cid,
    get_xcat,
)
import contextlib
import random


[docs]def simulate_ar(nobs: int, mean: float = 0, sd_mult: float = 1, ar_coef: float = 0.75): """ Create an auto-correlated data-series as numpy array. Parameters ---------- nobs : int number of observations. mean : float mean of values, default is zero. sd_mult : float standard deviation multipliers of values, default is 1. This affects non-zero means. ar_coef : float autoregression coefficient (between 0 and 1): default is 0.75. Returns ------- np.ndarray autocorrelated data series. """ # Define relative parameters for creating an AR process. ar_params = np.r_[1, -ar_coef] ar_proc = ArmaProcess(ar_params) # define ARMA process ser = ar_proc.generate_sample(nobs) ser = ser + mean - np.mean(ser) return sd_mult * ser / np.std(ser)
[docs]def dataframe_generator( df_cids: pd.DataFrame, df_xcats: pd.DataFrame, cid: str, xcat: str ): """ Adjacent method used to construct the quantamental DataFrame. Parameters ---------- df_cids : pd.DataFrame df_xcats : pd.DataFrame cid : str individual cross-section. xcat : str individual category. Returns ------- Tuple[pd.DataFrame, pd.DatetimeIndex] Tuple containing the quantamental DataFrame and a DatetimeIndex of the business days. """ qdf_cols = ["cid", "xcat", "real_date", "value"] sdate = pd.to_datetime( max(df_cids.loc[cid, "earliest"], df_xcats.loc[xcat, "earliest"]) ) edate = pd.to_datetime( min(df_cids.loc[cid, "latest"], df_xcats.loc[xcat, "latest"]) ) all_days = pd.bdate_range(sdate, edate) work_days = all_days[all_days.weekday < 5] df_add = pd.DataFrame(columns=qdf_cols) df_add["real_date"] = work_days df_add["cid"] = cid df_add["xcat"] = xcat return df_add, work_days
[docs]@contextlib.contextmanager def temporary_seed(seed: Optional[int]): """ A context manager that temporarily sets the seed for both NumPy and Python's random. """ if seed is None or not isinstance(seed, int): yield return # no seed - do nothing. np_state = np.random.get_state() random_state = random.getstate() np.random.seed(seed) random.seed(seed) try: yield finally: np.random.set_state(np_state) random.setstate(random_state)
[docs]def make_qdf( df_cids: pd.DataFrame, df_xcats: pd.DataFrame, back_ar: float = 0, seed: Optional[int] = None, ) -> QuantamentalDataFrame: """ Make quantamental DataFrame with basic columns: 'cid', 'xcat', 'real_date', 'value'. Parameters ---------- df_cids : pd.DataFrame DataFrame with parameters by cid. Row indices are cross-sections. Columns are: 'earliest': string of earliest date (ISO) for which country values are available; 'latest': string of latest date (ISO) for which country values are available; 'mean_add': float of country-specific addition to any category's mean; 'sd_mult': float of country-specific multiplier of an category's standard deviation. df_xcats : pd.DataFrame dataframe with parameters by xcat. Row indices are cross-sections. Columns are: 'earliest': string of earliest date (ISO) for which category values are available; 'latest': string of latest date (ISO) for which category values are available; 'mean_add': float of category-specific addition; 'sd_mult': float of country- specific multiplier of an category's standard deviation; 'ar_coef': float between 0 and 1 denoting set auto-correlation of the category; 'back_coef': float, coefficient with which communal (mean 0, SD 1) background factor is added to category values. back_ar : float float between 0 and 1 denoting set auto-correlation of the background factor. Default is zero. seed : int seed for random number generation. Default is None. Returns ------- pd.DataFrame basic quantamental DataFrame according to specifications. """ with temporary_seed(seed): qdf = _make_qdf(df_cids, df_xcats, back_ar) return qdf
def _make_qdf(df_cids: pd.DataFrame, df_xcats: pd.DataFrame, back_ar: float = 0): df_list = [] if any(df_xcats["back_coef"] != 0): sdate = min(min(df_cids.loc[:, "earliest"]), min(df_xcats.loc[:, "earliest"])) edate = max(max(df_cids.loc[:, "latest"]), max(df_xcats.loc[:, "latest"])) all_days = pd.bdate_range(sdate, edate) work_days = all_days[all_days.weekday < 5] ser = simulate_ar(len(work_days), mean=0, sd_mult=1, ar_coef=back_ar) df_back = pd.DataFrame(index=work_days, columns=["value"]) df_back["value"] = ser for cid in df_cids.index: for xcat in df_xcats.index: df_add, work_days = dataframe_generator( df_cids=df_cids, df_xcats=df_xcats, cid=cid, xcat=xcat ) ser_mean = df_cids.loc[cid, "mean_add"] + df_xcats.loc[xcat, "mean_add"] ser_sd = df_cids.loc[cid, "sd_mult"] * df_xcats.loc[xcat, "sd_mult"] ser_arc = df_xcats.loc[xcat, "ar_coef"] df_add["value"] = simulate_ar( len(work_days), mean=ser_mean, sd_mult=ser_sd, ar_coef=ser_arc ) back_coef = df_xcats.loc[xcat, "back_coef"] # Add the influence of communal background series. if back_coef != 0: dates = df_add["real_date"] df_add["value"] = df_add["value"] + back_coef * df_back.loc[ dates, "value" ].reset_index(drop=True) df_list.append(df_add) return pd.concat(df_list).reset_index(drop=True)
[docs]def make_qdf_black(df_cids: pd.DataFrame, df_xcats: pd.DataFrame, blackout: dict): """ Make quantamental DataFrame with basic columns: 'cid', 'xcat', 'real_date', 'value'. In this DataFrame the column, 'value', will consist of Binary Values denoting whether the cross-section is active for the corresponding dates. Parameters ---------- df_cids : pd.DataFrame dataframe with parameters by cid. Row indices are cross-sections. Columns are: 'earliest': string of earliest date (ISO) for which country values are available; 'latest': string of latest date (ISO) for which country values are available; 'mean_add': float of country-specific addition to any category's mean; 'sd_mult': float of country-specific multiplier of an category's standard deviation. df_xcats : pd.DataFrame dataframe with parameters by xcat. Row indices are cross-sections. Columns are: 'earliest': string of earliest date (ISO) for which category values are available; 'latest': string of latest date (ISO) for which category values are available; 'mean_add': float of category-specific addition; 'sd_mult': float of country- specific multiplier of an category's standard deviation; 'ar_coef': float between 0 and 1 denoting set autocorrelation of the category; 'back_coef': float, coefficient with which communal (mean 0, SD 1) background factor is added to categoy values. blackout : dict Dictionary defining the blackout periods for each cross- section. The expected form of the dictionary is: {'AUD': (Timestamp('2000-01-13 00:00:00'), Timestamp('2000-01-13 00:00:00')), 'USD_1': (Timestamp('2000-01-03 00:00:00'), Timestamp('2000-01-05 00:00:00')), 'USD_2': (Timestamp('2000-01-09 00:00:00'), Timestamp('2000-01-10 00:00:00')), 'USD_3': (Timestamp('2000-01-12 00:00:00'), Timestamp('2000-01-12 00:00:00'))} The values of the dictionary are tuples consisting of the start & end-date of the respective blackout period. Each cross- section could have potentially more than one blackout period on a single category, and subsequently each key will be indexed to indicate the number of periods. Returns ------- pd.DataFrame basic quantamental DataFrame according to specifications with binary values. """ df_list = [] conversion = lambda t: (pd.Timestamp(t[0]), pd.Timestamp(t[1])) dates_dict = defaultdict(list) for k, v in blackout.items(): v = conversion(v) dates_dict[k[:3]].append(v) # At the moment the blackout period is being applied uniformally to each category: # each category will experience the same blackout periods. for cid in df_cids.index: for xcat in df_xcats.index: df_add, work_days = dataframe_generator( df_cids=df_cids, df_xcats=df_xcats, cid=cid, xcat=xcat ) arr = np.repeat(0, df_add.shape[0]) dates = df_add["real_date"].to_numpy() list_tuple = dates_dict[cid] for tup in list_tuple: start = tup[0] end = tup[1] condition_start = start.weekday() - 4 condition_end = end.weekday() - 4 # Will skip the associated blackout period because of the received # invalid date, if it is not within the respective data series' range, # but will continue to populate the dataframe according to the other keys # in the dictionary. # Naturally compare against the data-series' formal start & end date. if start < dates[0] or end > dates[-1]: warnings.warn("Blackout period date not within data series range.") break # If the date falls on a weekend, change to the following Monday. elif condition_start > 0: while start.weekday() > 4: start += datetime.timedelta(days=1) elif condition_end > 0: while end.weekday() > 4: end += datetime.timedelta(days=1) index_start = next(iter(np.where(dates == start)[0])) count = 0 while start != tup[1]: if start.weekday() < 5: count += 1 start += datetime.timedelta(days=1) arr[index_start : (index_start + count + 1)] = 1 df_add["value"] = arr df_list.append(df_add) return pd.concat(df_list).reset_index(drop=True)
[docs]def generate_lines(sig_len: int, style: str = "linear") -> Union[np.ndarray, List[str]]: """ Returns a numpy array of a line with a given length. Parameters ---------- sig_len : int The number of elements in the returned array. style : str The style of the line. Default `'linear'`. Current choices are: `linear`, `decreasing-linear`, `sharp-hill`, `four-bit-sine`, `sine`, `cosine`, `sawtooth`. Adding `"inv"` or "inverted" to the style will return the inverted version of that line. For example, `'inv-sawtooth'` or `'inverted sawtooth'` will return the inverted sawtooth line. `'any'` will return a random line. `'all'` will return a list of all the available styles. Returns ------- Union[np.ndarray, List[str]] A numpy array of the line. If `style` is `'all'`, then a list (of strings) of all the available styles is returned. NOTE: It is indeed request an `"inverted linear"` or `"inverted decreasing-linear"` line. They're just there for completeness and readability. """ def _sawtooth(sig_len: int) -> np.ndarray: max_cycles = 4 _tmp = sig_len // max_cycles up = np.tile(np.arange(-100, 100, 200 / _tmp), max_cycles * 2) return np.abs(up)[:sig_len] def _sharp_hill(sig_len: int) -> np.ndarray: return np.concatenate( [ np.arange(1, sig_len // 4 + 1) * 100 / sig_len, np.arange(sig_len // 4, sig_len // 4 * 3 + 1)[::-1] * 100 / sig_len, np.arange(sig_len // 4 * 3, sig_len + 1) * 100 / sig_len, ] ) def _four_bit_sine(sig_len: int) -> np.ndarray: return np.concatenate( [ np.arange(1, sig_len // 4 + 1) * 100 / sig_len, np.arange(sig_len // 4, sig_len // 4 * 3 + 1)[::-1] * 100 / sig_len, np.arange(sig_len // 4 * 3, sig_len + 1) * 100 / sig_len, ] ) def _sine(sig_len: int) -> np.ndarray: return np.sin(np.arange(1, sig_len + 1) * np.pi / (sig_len / 2)) * 50 + 50 def _cosine(sig_len: int) -> np.ndarray: return np.cos(np.arange(1, sig_len + 1) * np.pi / (sig_len / 2)) * 50 + 50 def _linear(sig_len: int) -> np.ndarray: return np.arange(1, sig_len + 1) * 100 / sig_len def _decreasing_linear(sig_len: int) -> np.ndarray: return _linear(sig_len)[::-1] if not isinstance(sig_len, int): raise TypeError("`sig_len` must be an integer.") elif sig_len < 1: raise ValueError("`sig_len` must be greater than 0.") if not isinstance(style, str): raise TypeError("`style` must be a string.") style: str = "-".join(style.strip().lower().split()) lines: Dict[str, np.ndarray] = { "linear": _linear, "decreasing-linear": _decreasing_linear, "sharp-hill": _sharp_hill, "four-bit-sine": _four_bit_sine, "sine": _sine, "cosine": _cosine, "sawtooth": _sawtooth, } if "inv" in style: style = "-".join([s for s in style.split("-") if "inv" not in s]) if style in lines: return lines[style](sig_len)[:sig_len] elif style == "any": r_choice: str = np.random.choice(list(lines.keys())) return lines[r_choice](sig_len)[:sig_len] elif style == "all": # return the list of choices opns: List[str] = list(lines.keys()) inv_opns: List[str] = [f"inverted-{opn}" for opn in opns] return opns + inv_opns else: raise ValueError(f"Invalid style: {style}. Use one of: {list(lines.keys())}.")
[docs]def make_test_df( cids: Optional[List[str]] = ["AUD", "CAD", "GBP"], xcats: Optional[List[str]] = ["XR", "CRY"], tickers: Optional[List[str]] = None, metrics: List[str] = ["value"], start: str = "2010-01-01", end: str = "2020-12-31", style: str = "any", ) -> QuantamentalDataFrame: """ Generates a test dataframe with pre-defined values. These values are meant to be used for testing purposes only. The functions generates a standard quantamental dataframe with where the value column is populated with pre-defined values. These values are simple lines, or waves that are easy to identify and differentiate in a plot. Parameters ---------- cids : List[str] A list of strings for cids. xcats : List[str] A list of strings for xcats. tickers : List[str] A list of strings for tickers. If provided, `cids` and `xcats` will be ignored. metrics : List[str] A list of strings for metrics. start : str An ISO-formatted date string. end : str An ISO-formatted date string. style : str A string that specifies the type of line to generate. Current choices are: 'linear', 'decreasing-linear', 'sharp-hill', 'four-bit-sine', 'sine', 'cosine', 'sawtooth', 'any'. See `macrosynergy.management.simulate.simulate_quantamental_data.generate_lines()`. """ ## Check the inputs if isinstance(cids, str): cids = [cids] if isinstance(xcats, str): xcats = [xcats] if isinstance(tickers, str): tickers = [tickers] if isinstance(metrics, str): metrics = [metrics] if not isinstance(metrics, list): raise TypeError("`metrics` must be a list of strings.") if "all" in metrics: metrics = ["value", "grading", "eop_lag", "mop_lag"] if (cids is None) != (xcats is None): raise ValueError("Please provide both `cids` and `xcats` or neither.") if tickers is None or len(tickers) == 0: if cids is None: raise ValueError("Please provide a list of tickers or `cids` & `xcats`.") if tickers is not None: cids = None xcats = None for varx, namex in zip( [cids, xcats, metrics, tickers], ["cids", "xcats", "metrics", "tickers"] ): if varx is not None: if not isinstance(varx, list): raise TypeError(f"`{namex}` must be a list.") if len(varx) == 0: raise ValueError(f"`{namex}` cannot be empty.") if not all(isinstance(x, str) for x in varx): raise TypeError(f"All elements in `{namex}` must be strings.") for varx, namex in zip([start, end], ["start", "end"]): if not is_valid_iso_date(varx): raise ValueError(f"`{namex}` must be a valid ISO date string.") ## Generate the dataframe dates: pd.DatetimeIndex = pd.bdate_range(start, end) all_tickers: List[str] = tickers if tickers is not None else [] if cids is not None: all_tickers += [f"{cid}_{xcat}" for cid in cids for xcat in xcats] all_tickers = sorted(set(all_tickers)) df_list: List[pd.DataFrame] = [] for ticker in all_tickers: df_add: pd.DataFrame = pd.DataFrame( index=dates, columns=["real_date", "cid", "xcat", *metrics] ) df_add["cid"] = get_cid(ticker) df_add["xcat"] = get_xcat(ticker) df_add["real_date"] = dates for metric in metrics: df_add[metric] = generate_lines(len(dates), style=style) df_list.append(df_add) return pd.concat(df_list).reset_index(drop=True)
[docs]def simulate_returns_and_signals( # n_cids: int = 4, cids=["AUD", "CAD", "GBP", "USD"], xcat="EQ", return_suffix: str = "XR", signal_suffix: str = "_CSIG_STRAT", years: int = 20, sigma_eta: float = 0.01, sigma_0: float = 0.1, start: str = None, end: str = None, ): """Simulate returns and signals Equations for return and signal generation: 1. r(t+1,i) = sigma(t+1,i)*(alpha(t+1,i) + beta(t+1,i)*rb(t+1) + epsilon(t+1,i)) epsilon(t+1,i) ~ N(0, 1) 2. ln(sigma(t+1,i)) = ln(sigma(t,i)) + eta(t+1,i), eta(t+1,i) ~ N(0, sigma_eta^2) 3. alpha(t+1,i) = signal(t,i) + eta_alpha(t+1,i), eta_alpha(t+1,i) ~ N(0, sigma_alpha^2) 4. beta(t+1,i) = beta(t,i) + eta_beta(t+1,i), eta_beta(t+1,i) ~ N(0, sigma_beta^2) 5. rb(t+1) = mu + eta_rb(t+1), eta_rb(t+1) ~ N(0, sigma_rb^2) 6. signal(t, i) = ... mean zero, but persistence.... """ n_cids = len(cids) periods = 252 * years assert (periods > 0) and (n_cids > 0) def simulate_volatility( periods: int = 252 * 20, sigma_eta: float = 0.01, sigma_0: float = 0.1 ): sigma = np.empty(shape=(periods + 1)) sigma[0] = sigma_0 # Daily volatility (10 percent ASD) eta_sigma = np.random.normal(0, sigma_eta, periods) for i, e in enumerate(eta_sigma): sigma[i + 1] = np.exp(np.log(sigma[i]) + e) return sigma[1:] dates = pd.bdate_range( end=pd.Timestamp.today() + pd.offsets.BDay(n=0), periods=periods ) # Generate volatility # print("Generate volatility (shared???)") volatility = np.empty(shape=(periods, n_cids)) for nn in range(n_cids): volatility[:, nn] = simulate_volatility( periods=periods, sigma_eta=sigma_eta, sigma_0=sigma_0 ) # Generate signals: persistent? rho_signal = 0.9 mean_signal = 0 signals = np.empty(shape=(periods + 1, n_cids)) signals[0, :] = mean_signal for t in range(periods): signals[t + 1, :] = ( (1 - rho_signal) * mean_signal + rho_signal * signals[t, :] + np.random.normal(0, 0.01, n_cids) ) # signals = np.random.randn(periods, n_cids) # Unit variance, zero mean signals = signals[1:, :] # Generate alpha # TODO alpha needs to be a function of lagged signal and not necessarily continous? # TODO signal and alpha can't be concurrent! # TODO signal proxy/captures a slow moving trend in the alpha (risk-premium) for i in range(int(periods / years)): signals[i * years : i * years + years, :] = signals[i * years, :] alpha = signals + np.random.randn(periods, n_cids) # Unit variance, zero mean # Generate benchmark return rb = 0.4 / 252 + np.random.randn(periods, 1) # 4% annual returns, unit variance # Generate beta beta = np.empty(shape=(1, n_cids, periods + 1)) beta[:, :, 0] = 0.6 # Initial beta value for i in range(periods): beta[:, :, i + 1] = beta[:, :, i] + 0.005 * np.random.randn(1, n_cids) beta = beta[:, :, 1:] # print("Final values of beta") # print(pd.Series(beta[0, :, -1]).describe()) # TODO get with kron-product? rb_factor = np.array([rb[tt] * beta[0, :, tt] for tt in range(periods)]) # Calculate returns rtn = volatility * (alpha + rb_factor + np.random.randn(periods, 1)) # TODO test simulated returns matches random walk hypothesis on the face of it if not ((bool(start) or bool(end)) and not (bool(start) and bool(end))): raise ValueError("Either `start` or `end` must be provided, but not both.") dtx = pd.Timestamp(start) if start else pd.Timestamp(end) dtx = pd.Timestamp(start) if start else pd.Timestamp(end) + pd.offsets.BDay(0) if start: dates = pd.bdate_range(start=dtx, periods=periods) else: dates = pd.bdate_range(end=dtx, periods=periods) df_rtn = pd.DataFrame(index=dates, data=rtn) df_signals = pd.DataFrame(index=dates, data=signals) # TODO change dates to be previous month... # TODO stack into quantamental dataframe # return df_rtn, df_signals xr_tickers = [f"{cid}_{xcat}{return_suffix}" for cid in cids] csig_tickers = [f"{cid}_{xcat}_{signal_suffix}" for cid in cids] dfR = pd.concat([df_rtn, df_signals], axis=1) dfR.columns = xr_tickers + csig_tickers dfR.index.name = "real_date" return ticker_df_to_qdf(dfR)
if __name__ == "__main__": ser_ar = simulate_ar(100, mean=0, sd_mult=1, ar_coef=0.75) cids = ["AUD", "CAD", "GBP"] xcats = ["XR", "CRY"] df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD",] = ["2010-01-01", "2020-12-31", 0.5, 2] df_cids.loc["CAD",] = ["2011-01-01", "2020-11-30", 0, 1] df_cids.loc["GBP",] = ["2011-01-01", "2020-11-30", -0.2, 0.5] df_xcats = pd.DataFrame( index=xcats, columns=["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"], ) df_xcats.loc["XR",] = ["2010-01-01", "2020-12-31", 0, 1, 0, 0.3] df_xcats.loc["CRY",] = ["2011-01-01", "2020-10-30", 1, 2, 0.9, 0.5] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75)