Source code for macrosynergy.panel.make_zn_scores

"""
Module for calculating z-scores for a panel around a neutral level ("zn scores").
"""

import numpy as np
import pandas as pd
from typing import List, Optional, Union
from numbers import Number
from macrosynergy.management.simulate import make_qdf
from macrosynergy.management.utils import (
    drop_nan_series,
    reduce_df,
    _map_to_business_day_frequency,
    forward_fill_wide_df,
)
from macrosynergy.management.types import QuantamentalDataFrame


[docs]def make_zn_scores( df: pd.DataFrame, xcat: Union[str, List[str]] = None, cids: List[str] = None, start: str = None, end: str = None, blacklist: dict = None, sequential: bool = True, min_obs: int = 261, iis: bool = True, neutral: Union[str, Number] = "zero", est_freq: str = "D", thresh: float = None, upfront_thresh: float = None, pan_weight: float = 1, postfix: str = "ZN", ffill: int = 0, unscore: bool = False, ) -> pd.DataFrame: """ Computes z-scores for a panel around a neutral level ("zn scores"). Parameters ---------- df : ~pandas.Dataframe standardized JPMaQS DataFrame with the necessary columns: 'cid', 'xcat', 'real_date' and 'value'. xcat : str or List[str] extended category (or list of categories) for which zn-scores are calculated. If a list is provided, scores are computed separately for each category and the combined standardized DataFrame is returned. cids : List[str] cross sections for which zn_scores are calculated; default is all available for category. start : str earliest date in ISO format. Default is None and earliest date in df is used. end : str latest date in ISO format. Default is None and latest date in df is used. blacklist : dict cross-sections with date ranges that should be excluded from the calculation of zn-scores. This means that not only are there no zn-score values calculated for these periods, but also that they are not used for the scoring of other periods. sequential : bool if True (default) score parameters (neutral level and mean absolute deviation) are estimated sequentially with concurrently available information only. min_obs : int the minimum number of observations required to calculate zn_scores. Default is 261. The parameter is only applicable if the "sequential" parameter is set to True. Otherwise the neutral level and the mean absolute deviation are both computed in- sample and will use the full sample. iis : bool if True (default) zn-scores are also calculated for the initial sample period defined by min-obs on an in-sample basis to avoid losing history. This is irrelevant if sequential is set to False. neutral : str, Number method to determine neutral level. Default is 'zero'. Alternatives are 'mean', 'median' or a number. est_freq : str the frequency at which mean absolute deviations or means are are re-estimated. The options are daily, weekly, monthly & quarterly: "D", "W", "M", "Q". Default is daily. Re-estimation is performed at period end. thresh : float threshold value beyond which scores are winsorized, i.e. contained at that threshold. The threshold is the maximum absolute score value that the function is allowed to produce. The minimum threshold is 1 mean absolute deviation. upfront_thresh : float threshold value beyond which the original input data are winsorized, i.e. capped or floored at that threshold on the positive or negative side. Default is None. The threshold limits the values of the original data in their native units to avoid large outliers compromising subsequent operations. pan_weight : float weight of panel (versus individual cross section) for calculating the z-score parameters, i.e. the neutral level and the mean absolute deviation. Default is 1, i.e. panel data are the basis for the parameters. Lowest possible value is 0, i.e. parameters are all specific to cross section. postfix : str string appended to category name for output; default is "ZN". ffill : int, default 0 Forward fills the trailing NaN values in the input DataFrame. The parameter specifies the number of periods to fill. If set to 0, no forward fill is performed. unscore : bool, default False If True, the function will apply the specified threshold to z-scores, but return values on the original scale. The `thresh` parameter will determine the z-score limits, and the winsorized values will be converted back to the original scale before being returned. Returns ------- ~pandas.Dataframe standardized DataFrame with the zn-scores of the chosen category: 'cid', 'xcat', 'real_date' and 'value'. .. note:: The blacklist argument is a dictionary with cross-sections as keys and tuples of start and end dates of the blacklist periods in ISO formats as values. If one cross section has multiple blacklist periods, numbers are added to the keys (i.e. TRY_1, TRY_2, etc.) """ expected_columns = ["cid", "xcat", "real_date", "value"] df = QuantamentalDataFrame(df[expected_columns]) if xcat is None: raise ValueError("The `xcat` parameter must be provided.") if isinstance(xcat, str): xcats = [xcat] elif isinstance(xcat, list) and all(isinstance(c, str) for c in xcat): if len(xcat) == 0: raise ValueError("The `xcat` parameter must not be empty.") xcats = list(dict.fromkeys(xcat)) else: raise TypeError("The `xcat` parameter must be a string or a list of strings.") outputs = [ _make_zn_scores_for_xcat( df=df, xcat=category, cids=cids, start=start, end=end, blacklist=blacklist, sequential=sequential, min_obs=min_obs, iis=iis, neutral=neutral, est_freq=est_freq, thresh=thresh, upfront_thresh=upfront_thresh, pan_weight=pan_weight, postfix=postfix, ffill=ffill, unscore=unscore, ) for category in xcats ] if len(outputs) == 1: return outputs[0] combined = pd.concat(outputs, axis=0, ignore_index=True).sort_values( by=["cid", "xcat", "real_date"] ) return QuantamentalDataFrame.from_long_df( df=combined, categorical=df.InitializedAsCategorical, )
def _make_zn_scores_for_xcat( df: pd.DataFrame, xcat: str, cids: List[str] = None, start: str = None, end: str = None, blacklist: dict = None, sequential: bool = True, min_obs: int = 261, iis: bool = True, neutral: Union[str, Number] = "zero", est_freq: str = "D", thresh: float = None, upfront_thresh: float = None, pan_weight: float = 1, postfix: str = "ZN", ffill: int = 0, unscore: bool = False, ) -> pd.DataFrame: # --- Assertions err: str = ( "The `neutral` parameter must be a number or a string with value," " either 'mean', 'median' or 'zero'." ) if not isinstance(neutral, Number): if not isinstance(neutral, str): raise TypeError(err) elif neutral not in ["mean", "median", "zero"]: raise ValueError(err) if thresh is not None: err: str = "The `thresh` parameter must a numerical value >= 1.0." if not isinstance(thresh, Number): raise TypeError(err) elif thresh < 1.0: raise ValueError(err) if upfront_thresh is not None: err = "The `upfront_thresh` parameter must be a positive numerical value." if not isinstance(upfront_thresh, Number): raise TypeError(err) elif upfront_thresh <= 0: raise ValueError(err) if not isinstance(iis, bool): raise TypeError("Parameter `iis` must be a boolean.") err = ( "The `pan_weight` parameter must be a numerical value between 0 and 1 " "(inclusive)." ) if not isinstance(pan_weight, Number): raise TypeError(err) elif not (0 <= pan_weight <= 1): raise ValueError(err) error_min = "Minimum observations must be a non-negative Integer value." if not isinstance(min_obs, int): raise TypeError(error_min) if min_obs < 0: raise ValueError(error_min) est_freq = _map_to_business_day_frequency( freq=est_freq, valid_freqs=["D", "W", "M", "Q"] ) # --- Prepare re-estimation dates and time-series DataFrame. # Remove any additional metrics defined in the DataFrame. if cids is not None: missing_cids = set(cids).difference(set(df["cid"])) if missing_cids: raise ValueError( f"The following cids are not available in the DataFrame: " f"{missing_cids}." ) if xcat not in df["xcat"].unique(): raise ValueError(f"The xcat {xcat} is not available in the DataFrame.") df = reduce_df( df, xcats=[xcat], cids=cids, start=start, end=end, blacklist=blacklist ) if df.isna().values.any(): df = drop_nan_series(df=df, raise_warning=True) s_date = min(df["real_date"]) e_date = max(df["real_date"]) dates_iter = pd.date_range(start=s_date, end=e_date, freq=est_freq) dfw = df.pivot(index="real_date", columns="cid", values="value") cross_sections = dfw.columns if ffill > 0: # Forward fill the trailing NaN values in the input DataFrame. dfw = forward_fill_wide_df( dfw, blacklist, n=ffill ) if upfront_thresh is not None: dfw = dfw.clip(lower=-upfront_thresh, upper=upfront_thresh) # --- The actual scoring. dfw_zns_pan = dfw * 0 dfw_zns_css = dfw * 0 if dfw.shape[0] < min_obs and pan_weight < 1 and pan_weight > 0: raise ValueError( f"The DataFrame has less than {min_obs} observations. " "Please adjust the `min_obs` parameter." ) dfx_pan, df_mabs_pan, df_neutral_pan = None, None, None if pan_weight > 0: df_neutral_pan = expanding_stat( dfw, dates_iter, stat=neutral, sequential=sequential, min_obs=min_obs, iis=iis, ) dfx_pan = dfw.sub(df_neutral_pan["value"], axis=0) df_mabs_pan = expanding_stat( dfx_pan.abs(), dates_iter, stat="mean", sequential=sequential, min_obs=min_obs, iis=iis, ) dfw_zns_pan = dfx_pan.div(df_mabs_pan["value"], axis="rows") cid_dfx, cid_mabs, cid_neutral = {}, {}, {} if pan_weight < 1: for cid in cross_sections: dfi = dfw[cid] df_neutral = expanding_stat( dfi.to_frame(name=cid), dates_iter, stat=neutral, sequential=sequential, min_obs=min_obs, iis=iis, ) dfx = dfi - df_neutral["value"] df_mabs = expanding_stat( dfx.abs().to_frame(name=cid), dates_iter, stat="mean", sequential=sequential, min_obs=min_obs, iis=iis, ) dfx = pd.DataFrame(data=dfx.to_numpy(), index=dfx.index, columns=["value"]) dfx = dfx.rename_axis("cid", axis=1) zns_css_df = dfx / df_mabs dfw_zns_css.loc[:, cid] = zns_css_df["value"] cid_dfx[cid] = dfx cid_mabs[cid] = df_mabs["value"] cid_neutral[cid] = df_neutral["value"] dfw_zns = (dfw_zns_pan * pan_weight) + (dfw_zns_css * (1 - pan_weight)) dfw_zns = dfw_zns.dropna(axis=0, how="all") if thresh is not None: dfw_zns.clip(lower=-thresh, upper=thresh, inplace=True) if unscore: dfw_zns = _unscore_dfw_zns( dfw_zns, dfw_zns_pan, dfw_zns_css, df_mabs_pan, df_neutral_pan, cid_mabs, cid_neutral, cross_sections, pan_weight, ) # --- Reformatting of output into standardised DataFrame. df_out = dfw_zns.stack().to_frame("value").reset_index() df_out = QuantamentalDataFrame.from_long_df( df=df_out, xcat=xcat + postfix, categorical=df.InitializedAsCategorical, ) return df_out
[docs]def expanding_stat( df: pd.DataFrame, dates_iter: pd.DatetimeIndex, stat: Union[str, Number] = "mean", sequential: bool = True, min_obs: int = 261, iis: bool = True, ) -> pd.DataFrame: """ Compute specified statistic based on an expanding sample. Parameters ---------- df : ~pandas.Dataframe Daily-frequency time series DataFrame. dates_iter : ~pandas.DatetimeIndex controls the frequency of the neutral & mean absolute deviation calculations. stat : str, Number statistical method to be applied. This is typically 'mean', or 'median'. sequential : bool if True (default) the statistic is estimated sequentially. If this set to false a single value is calculated per time series, based on the full sample. min_obs : int minimum required observations for calculation of the statistic in days. iis : bool if set to True, the values of the initial interval determined by min_obs will be estimated in-sample, based on the full initial sample. Returns ------- ~pandas.DataFrame Time series dataframe of the chosen statistic across all columns """ df_out = pd.DataFrame(np.nan, index=df.index, columns=["value"]) # An adjustment for individual series' first realised value is not required given the # returned DataFrame will be subtracted from the original DataFrame. The original # DataFrame will implicitly host this information through NaN values such that when # the arithmetic operation is made, any falsified values will be displaced by NaN # values. first_observation = df.dropna(axis=0, how="all").index[0] # Adjust for individual cross-sections' series commencing at different dates. first_estimation = df.dropna(axis=0, how="all").index[min_obs] obs_index = np.where(df.index == first_observation)[0][0] est_index = np.where(df.index == first_estimation)[0][0] if stat == "zero": df_out["value"] = 0 elif isinstance(stat, Number): df_out["value"] = stat elif not sequential: # The entire series is treated as in-sample. Will automatically handle NaN # values. statval = df.stack().apply(stat) df_out["value"] = statval else: dates = dates_iter[dates_iter >= first_estimation] if stat == "mean": expanding_count = _get_expanding_count( df.loc[first_observation:], min_periods=min_obs + 1 ) df_mean = ( df.loc[first_observation:] .sum(1) .expanding(min_periods=min_obs + 1) .sum() / expanding_count ) try: df_mean = df_mean.dropna().loc[dates] except KeyError as e: err_str = 'Some dates in "dates_iter" have no corresponding data.' raise KeyError(err_str) from e df_mean.name = "value" df_out.update(df_mean) else: for date in dates: df_out.loc[date, "value"] = ( df.loc[first_observation:date].stack().apply(stat) ) df_out = df_out.ffill() if iis and (est_index - obs_index) > 0: df_out = df_out.bfill(limit=int(est_index - obs_index)) df_out.columns.name = "cid" return df_out
def _get_expanding_count(X: pd.DataFrame, min_periods: int = 1): """ Helper method to get the number of non-NaN values in each expanding window. Parameters ---------- X : ~pandas.DataFrame Pandas dataframe of input features. min_periods : int Minimum number of observations in window required to have a value (otherwise result is 0.). Returns ------- ~numpy.ndarray Numpy array of expanding counts. """ return X.expanding(min_periods).count().sum(1).to_numpy() def _unscore_dfw_zns( dfw_zns: pd.DataFrame, dfw_zns_pan: pd.DataFrame, dfw_zns_css: pd.DataFrame, df_mabs_pan: pd.DataFrame, df_neutral_pan: pd.DataFrame, cid_mabs: dict, cid_neutral: dict, cross_sections: list, pan_weight: float, ) -> pd.DataFrame: """ Unscore the weighted panel and cross-sectional components of dfw_zns. Parameters ---------- dfw_zns : pd.DataFrame The combined z-scored DataFrame. dfw_zns_pan : pd.DataFrame The panel component of dfw_zns. dfw_zns_css : pd.DataFrame The cross-sectional component of dfw_zns. df_mabs_pan : pd.DataFrame Mean absolute deviation for the panel component. df_neutral_pan : pd.DataFrame Neutral component for the panel component. cid_mabs : dict Dictionary of mean absolute deviations per cross-section. cid_neutral : dict Dictionary of neutral components per cross-section. cross_sections : list List of cross-section identifiers. pan_weight : float The weight of the panel component, ranging from 0 to 1. Returns ------- pd.DataFrame The unscored DataFrame. """ if pan_weight > 0: dfw_zns_pan = (dfw_zns - (dfw_zns_css * (1 - pan_weight))) / pan_weight dfw_unscored_pan = dfw_zns_pan.mul(df_mabs_pan["value"], axis=0).add( df_neutral_pan["value"], axis=0 ) else: dfw_unscored_pan = pd.DataFrame(0, index=dfw_zns.index, columns=dfw_zns.columns) if pan_weight < 1: dfw_zns_css = (dfw_zns - (dfw_zns_pan * pan_weight)) / (1 - pan_weight) dfw_unscored_css = pd.DataFrame(index=dfw_zns.index, columns=dfw_zns.columns) for cid in cross_sections: dfw_unscored_css[cid] = (dfw_zns_css[cid] * cid_mabs[cid]) + cid_neutral[ cid ] else: dfw_unscored_css = pd.DataFrame(0, index=dfw_zns.index, columns=dfw_zns.columns) if pan_weight == 1: dfw_unscored = dfw_unscored_pan elif pan_weight == 0: dfw_unscored = dfw_unscored_css else: dfw_unscored = dfw_unscored_css dfw_zns = dfw_unscored return dfw_unscored if __name__ == "__main__": np.random.seed(1) cids = ["AUD", "CAD", "GBP", "USD", "NZD"] xcats = ["XR", "CRY", "GROWTH", "INFL"] df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2010-01-01", "2020-12-31", 0.5, 2] df_cids.loc["CAD"] = ["2006-01-01", "2020-12-30", 0, 1] df_cids.loc["GBP"] = ["2008-01-01", "2020-12-29", -0.2, 0.5] df_cids.loc["USD"] = ["2007-01-01", "2020-09-30", -0.2, 0.5] df_cids.loc["NZD"] = ["2002-01-01", "2020-09-30", -0.1, 2] df_xcats = pd.DataFrame( index=xcats, columns=["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"], ) df_xcats.loc["XR"] = ["2008-01-01", "2020-12-31", 0, 1, 0, 0.3] df_xcats.loc["CRY"] = ["2011-01-01", "2020-10-30", 1, 2, 0.9, 0.5] df_xcats.loc["GROWTH"] = ["2012-01-01", "2020-10-30", 1, 2, 0.9, 1] df_xcats.loc["INFL"] = ["2013-01-01", "2020-10-30", 1, 2, 0.8, 0.5] # Apply a blacklist period from series' start date. black = {"AUD": ["2010-01-01", "2013-12-31"], "GBP": ["2020-12-31", "2100-01-01"]} dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) dfd["grading"] = np.ones(dfd.shape[0]) # Monthly: panel + cross. dfzm = make_zn_scores( dfd.copy(deep=True), xcat="XR", sequential=True, cids=cids, blacklist=black, iis=True, neutral="mean", pan_weight=0.5, min_obs=261, est_freq="D", unscore=True, # thresh=5 ) print(dfzm) # Weekly: panel + cross. dfzw = make_zn_scores( dfd, xcat="XR", sequential=True, cids=cids, blacklist=black, iis=False, neutral="mean", pan_weight=0.5, min_obs=261, est_freq="w", ) # Daily: panel. Neutral and mean absolute deviation will be computed daily. dfzd = make_zn_scores( dfd, xcat="XR", sequential=True, cids=cids, blacklist=black, iis=True, neutral="mean", pan_weight=1.0, min_obs=261, est_freq="d", ) # Daily: cross. dfd["ticker"] = dfd["cid"] + "_" + dfd["xcat"] dfzd = make_zn_scores( dfd, xcat="XR", sequential=True, cids=cids, blacklist=black, iis=True, neutral="mean", pan_weight=0.0, min_obs=261, est_freq="d", ) panel_df = make_zn_scores( dfd, "CRY", cids, start="2010-01-04", blacklist=black, sequential=False, min_obs=0, neutral="mean", iis=True, thresh=None, pan_weight=0.75, postfix="ZN", ) print(panel_df) panel_df_7 = make_zn_scores( dfd, "CRY", cids, start="2010-01-04", blacklist=black, sequential=False, min_obs=0, neutral="zero", iis=True, thresh=None, pan_weight=0.75, postfix="ZN", ) print(panel_df_7) multi_xcat_df = make_zn_scores( dfd, xcat=["XR", "CRY"], cids=cids, start="2010-01-04", sequential=False, min_obs=0, neutral="mean", iis=True, thresh=None, pan_weight=0.5, postfix="ZN", ) print(multi_xcat_df)