Source code for macrosynergy.panel.category_relations

"""
Classes and functions for analyzing and visualizing the relations of two panel categories.
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Union, Tuple
from scipy import stats
import statsmodels.api as sm
import warnings

from macrosynergy.management.simulate import make_qdf
from macrosynergy.management.utils import categories_df
from macrosynergy.management.utils import apply_slip as apply_slip_util
from macrosynergy.management.types import QuantamentalDataFrame


[docs]class CategoryRelations(object): """ Class for analyzing and visualizing the relations of multiple panel categories. Parameters ---------- df : ~pandas.DataFrame standardized DataFrame with the necessary columns: 'cid', 'xcat', 'real_date' and at least one column with values of interest. xcats : List[str] exactly two extended categories to be analyzed. If there is a hypothesized explanatory-dependent relation, the first category is the explanatory variable and the second category the explained variable. cids : List[str] cross-sections for which the category relations is being analyzed. Default is all in the DataFrame. start : str earliest date in ISO format. Default is None in which case the earliest date in the DataFrame will be used. end : str latest date in ISO format. Default is None in which case the latest date in the DataFrame will be used. blacklist : dict cross-sections with date ranges that should be excluded from the analysis. years : int number of years over which data are aggregated. Supersedes the 'freq' parameter and does not allow lags, Default is None, meaning no multi-year aggregation. Note: for single year labelled plots, better use freq='A' for cleaner labels. val : str name of column that contains the values of interest. Default is 'value'. freq : str letter denoting frequency at which the series are to be sampled. This must be one of 'D', 'W', 'M', 'Q', 'A'. Default is 'M'. lag : int lag (delay of arrival) of first (explanatory) category in periods as set by freq. Default is 0. Importantly, for analyses with explanatory and dependent categories, the first category takes the role of the explanatory and a positive lag means that the explanatory values will be deferred into the future, i.e. relate to future values of the explained variable. xcat_aggs : List[str] Exactly two aggregation methods. Default is 'mean' for both. xcat1_chg : str time series changes are applied to the first category. Default is None. Options are 'diff' (first difference) and 'pch' (percentage change). The changes are calculated over the number of periods determined by `n_periods`. n_periods : int number of periods over which changes of the first category have been calculated. Default is 1. fwin : int forward moving average window of second category. Default is 1, i.e no average. Importantly, for analysis with explanatory and dependent categories, the second takes the role of the dependent and a forward window means that the dependent values average forward into the future. xcat_trims : List[float] two-element list with maximum absolute values for the two respective categories. Observations with higher values will be trimmed, i.e. removed from the analysis (not winsorized!). Default is None for both. Trimming is applied after all other transformations. slip : int number of periods to 'slip' the explanatory variable, i.e. the first category. Here, slip mimics the late arrival of the data, or the time it takes to act on the data. Default is 0. """ def __init__( self, df: pd.DataFrame, xcats: List[str], cids: List[str] = None, val: str = "value", start: str = None, end: str = None, blacklist: dict = None, years: int = None, freq: str = "M", lag: int = 0, fwin: int = 1, xcat_aggs: List[str] = ["mean", "mean"], xcat1_chg: str = None, n_periods: int = 1, xcat_trims: List[float] = [None, None], slip: int = 0, ): """Initializes CategoryRelations""" if not isinstance(freq, str): raise TypeError("freq must be a string.") self.xcats: List[str] = xcats self.cids: List[str] = cids self.val: str = val self.freq: str = freq.upper() self.lag: int = lag self.years: int = years self.aggs: List[str] = xcat_aggs self.xcat1_chg: str = xcat1_chg self.n_periods: int = n_periods self.xcat_trims: List[float] = xcat_trims self.slip: int = slip if self.freq not in ["D", "W", "M", "Q", "A"]: raise ValueError("freq must be one of 'D', 'W', 'M', 'Q', 'A'.") if not isinstance(val, str): raise TypeError("val must be a string.") if not {"cid", "xcat", "real_date", val}.issubset(set(df.columns)): raise ValueError( f"`df` must have columns 'cid', 'xcat', 'real_date' and `{val}`." ) df = QuantamentalDataFrame(df) if not isinstance(xcats, (list, tuple)): raise TypeError("`xcats` must be a list or a tuple.") elif not len(xcats) == 2: raise ValueError("`xcats` must have exactly two elements.") if not isinstance(slip, int): raise TypeError("`slip` must be a non-negative integer.") elif slip < 0: raise ValueError("`slip` must be a non-negative integer.") if not isinstance(xcat_aggs, (list, tuple)): raise TypeError("xcat_aggs must be a list or a tuple.") # copy DF to avoid side-effects df: pd.DataFrame = df.copy() # Select the cross-sections available for both categories. df.loc[:, "real_date"] = pd.to_datetime(df["real_date"], format="%Y-%m-%d") if self.slip != 0: metrics_found: List[str] = list( set(df.columns) - set(["cid", "xcat", "real_date"]) ) # here, the slip is applied to the the first xcat (explanatory variable) df = self.apply_slip( df=df, slip=self.slip, cids=self.cids, xcats=[self.xcats[0]], metrics=metrics_found, ) # capture warning from intersection_cids, in case the two categories do not # share any cross-sections. warnings_list = [] with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") shared_cids = CategoryRelations.intersection_cids(df, xcats, cids) for warning in w: warnings_list.append(str(warning.message)) # if shared_cids is empty, then the analysis is not possible. # The warning from intersection_cids now becomes an error. if len(shared_cids) == 0: error_message = "The two categories have no shared cross-sections." if len(warnings_list) > 0: error_message += f"\nPossible reason(s) for error: " error_message += "\n".join(warnings_list) error_message += "\nPlease check input parameters." raise ValueError(error_message) # Will potentially contain NaN values if the two categories are defined over # time-periods. df = categories_df( df, xcats, shared_cids, val=val, start=start, end=end, freq=self.freq, blacklist=blacklist, years=years, lag=lag, fwin=fwin, xcat_aggs=xcat_aggs, ) if xcat1_chg is not None: xcat1_error = ( "Change applied to the explanatory variable must either be " "first-differencing, 'diff', or percentage change, 'pch'." ) assert xcat1_chg in ["diff", "pch"], xcat1_error n_periods_error = f"<int> expected and not {type(n_periods)}." assert isinstance(n_periods, int), n_periods_error df = CategoryRelations.time_series( df, change=xcat1_chg, n_periods=n_periods, shared_cids=shared_cids, expln_var=xcats[0], ) if any([xt is not None for xt in self.xcat_trims]): xcat_trim_error = ( "Two values expected corresponding to the number " "of categories." ) assert len(xcat_trims) == len(xcats), xcat_trim_error types = [ isinstance(elem, (float, int)) and elem >= 0.0 for elem in xcat_trims ] assert any(types), "Expected two floating point values." df = CategoryRelations.outlier_trim(df, xcats, xcat_trims) # NaN values will not be handled if both of the above conditions are not # satisfied. self.df = df.dropna(axis=0, how="any")
[docs] @classmethod def intersection_cids(cls, df, xcats, cids): """ Returns common cross-sections across both categories and specified parameter. Parameters ---------- df : ~pandas.DataFrame standardised DataFrame. xcats : List[str] exactly two extended categories to be checked on. cids : List[str] cross-sections for which the category relation is being analyzed. Returns ------- List[str] usable: List of the common cross-sections across the two categories. """ set_1 = set(df[df["xcat"] == xcats[0]]["cid"]) set_2 = set(df[df["xcat"] == xcats[1]]["cid"]) miss_1 = list(set(cids).difference(set_1)) miss_2 = list(set(cids).difference(set_2)) if len(miss_1) > 0: print(f"{xcats[0]} misses: {sorted(miss_1)}.") warnings.warn(f"{xcats[0]} misses: {sorted(miss_1)}.", UserWarning) if len(miss_2) > 0: print(f"{xcats[1]} misses: {sorted(miss_2)}.") warnings.warn(f"{xcats[1]} misses: {sorted(miss_2)}.", UserWarning) usable = list(set_1.intersection(set_2).intersection(set(cids))) return usable
[docs] @staticmethod def apply_slip( df: pd.DataFrame, slip: int, cids: List[str], xcats: List[str], metrics: List[str], ) -> pd.DataFrame: """ Calls the utility function apply_slip_util defined in df_utils. """ return apply_slip_util( df=df, slip=slip, cids=cids, xcats=xcats, metrics=metrics, raise_error=False )
[docs] @classmethod def time_series( cls, df: pd.DataFrame, change: str, n_periods: int, shared_cids: List[str], expln_var: str, ): """ Apply time-series changes to the explanatory variable. Calculates first differences or percentage changes of the time series. Parameters ---------- df : ~pandas.DataFrame multi-indexed DataFrame hosting the two categories: first column represents the explanatory variable; second column hosts the dependent variable. The DataFrame's index is the real-date and cross-section. change : str type of change to be applied. Can be 'diff' for first-differencing or 'pch' for percentage change. n_periods : int number of base periods in df over which the change is applied. shared_cids : List[str] shared cross-sections across the two categories and the received list. expln_var : str only the explanatory variable's data series will be changed from the raw value series to a difference or percentage change value. Returns ------- ~pandas.DataFrame returns the same multi-indexed DataFrame but with an adjusted series inline with the 'change' parameter. """ if change not in ["diff", "pch"]: raise ValueError("change must be 'diff' or 'pch'.") df_lists = [] for c in shared_cids: temp_df: pd.DataFrame = df.loc[c].copy() if change == "diff": temp_df[expln_var] = temp_df[expln_var].diff(periods=n_periods) elif change == "pch": temp_df[expln_var] = temp_df[expln_var].pct_change(periods=n_periods) temp_df["cid"] = c temp_df = temp_df.set_index("cid", append=True) df_lists.append(temp_df) df_ = pd.concat(df_lists) df_ = df_.dropna(axis=0, how="any") return df_
[docs] @classmethod def outlier_trim(cls, df: pd.DataFrame, xcats: List[str], xcat_trims: List[float]): """ Trim outliers from the dataset. Parameters ---------- df : ~pandas.DataFrame multi-indexed DataFrame hosting the two categories. The transformations, to each series, have already been applied. xcats : List[str] explanatory and dependent variable. xcat_trims : List[float] two-element list with maximum absolute values for the two respective categories. Observations with higher values will be trimmed, i.e. removed from the analysis (not winsorized!). Returns ------- ~pandas.DataFrame returns the same multi-indexed DataFrame. .. note:: Outliers are classified as any datapoint whose absolute value exceeds the predefined value specified in the field self.xcat_trims. The values will be set to NaN, and subsequently excluded from any regression modelling or correlation coefficients. """ xcat_dict = dict(zip(xcats, xcat_trims)) for k, v in xcat_dict.items(): # if the trim value is None, then leave the series as is if v is not None: df[k] = np.where(np.abs(df[k]) < v, df[k], np.nan) df = df.dropna(axis=0, how="any") return df
[docs] def corr_prob_calc( self, df_probability: Union[pd.DataFrame, List[pd.DataFrame]], prob_est: str ): """ Compute the correlation coefficient and probability statistics. Parameters ---------- df_probability : List[~pandas.DataFrame] or ~pandas.DataFrame pandas DataFrame containing the dependent and explanatory variables. prob_est : str type of estimator for probability of significant relation. Returns ------- List[tuple(float, float)] .. note:: The method is able to handle multiple DataFrames, and will return the corresponding number of statistics held inside a List. """ if isinstance(df_probability, pd.DataFrame): df_probability = [df_probability] cpl = [] for i, df_i in enumerate(df_probability): feat = df_i[self.xcats[0]].to_numpy() targ = df_i[self.xcats[1]].to_numpy() coeff, pval = stats.pearsonr(feat, targ) if prob_est == "kendall": _, pval = stats.kendalltau(feat, targ) if prob_est == "map": X = df_i.loc[:, self.xcats[0]] X = sm.add_constant(X) y = df_i.loc[:, self.xcats[1]] groups = df_i.reset_index().real_date re = sm.MixedLM(y, X, groups).fit(reml=False) # random effects est pval = float(re.summary().tables[1].iloc[1, 3]) row = [np.round(coeff, 3), np.round(1 - pval, 3)] cpl.append(row) return cpl
[docs] def corr_probability( self, df_probability: Union[pd.DataFrame, List[pd.DataFrame]], prob_est: str, time_period: str = "", coef_box_loc: str = "upper left", ax: plt.Axes = None, ): """ Add the computed correlation coefficient and probability to a Matplotlib table. Parameters ---------- df_probability : List[~pandas.DataFrame] or ~pandas.DataFrame pandas DataFrame containing the dependent and explanatory variables. Able to handle multiple DataFrames representing different time-periods of the original series. prob_est : str type of estimator for probability of significant relation. time_period : str indicator used to clarify which time-period the statistics are computed for. For example, before 2010 and after 2010: the two periods experience very different macroeconomic conditions. The default is an empty string. coef_box_loc : str location on the graph of the aforementioned box. The default is in the upper left corner. prob_bool : bool boolean parameter which determines whether the probability value is included in the table. The default is True. ax : plt.Axes Matplotlib Axes object. If None (default), new axes will be created. """ time_period_error = f"<str> expected - received {type(time_period)}." assert isinstance(time_period, str), time_period_error cpl = self.corr_prob_calc(df_probability=df_probability, prob_est=prob_est) fields = [ f"Correlation\n coefficient {time_period}", f"Probability\n of significance {time_period}", ] if isinstance(df_probability, list) and len(df_probability) == 2: row_headers = ["Before 2010", "After 2010"] cellC = [ ["lightsteelblue", "lightsteelblue"], ["lightsalmon", "lightsalmon"], ] else: row_headers = None cellC = None if ax is None: data_table = plt.table( cellText=cpl, cellColours=cellC, colLabels=fields, cellLoc="center", loc=coef_box_loc, zorder=10, ) else: data_table = ax.table( cellText=cpl, cellColours=cellC, colLabels=fields, cellLoc="center", loc=coef_box_loc, zorder=10, ) return data_table
[docs] def annotate_facet(self, data, **kws): """Annotate each graph within the facet grid.""" x = data[self.xcats[0]].to_numpy() y = data[self.xcats[1]].to_numpy() coeff, pval = stats.pearsonr(x, y) cpl = np.round(coeff, 3) fields = "Correlation coefficient: " ax = plt.gca() ax.text(0.04, 0.1, f"{fields} {cpl}", fontsize=10, transform=ax.transAxes)
[docs] def reg_scatter( self, title: str = None, title_fontsize: int = 14, labels: bool = False, size: Tuple[float] = None, xlab: str = None, ylab: str = None, label_fontsize: int = 12, tick_fontsize: int = 12, coef_box: str = None, coef_box_size: Tuple[float] = (0.4, 2.5), coef_box_font_size: int = 0, prob_est: str = "pool", fit_reg: bool = True, reg_ci: int = 95, reg_order: int = 1, reg_robust: bool = False, separator: Union[str, int] = None, title_adj: float = 1, single_chart: bool = False, single_scatter: bool = False, ncol: int = None, ax: plt.Axes = None, remove_zero_predictor: bool = False, ): """ Display scatter-plot and regression line. Parameters ---------- title : str title of plot. If None (default) an informative title is applied. title_fontsize : int font size of the title. Default is None. labels : bool assign a cross-section/period label to each dot. Default is False. size : Tuple[float] width and height of the figure xlab : str x-axis label. Default is no label. ylab : str y-axis label. Default is no label. fit_reg : bool if True (default) adds a regression line. reg_ci : int size of the confidence interval for the regression estimate. Default is 95. Can be None. reg_order : int order of the regression equation. Default is 1 (linear). reg_robust : bool if this will de-weight outliers, which is computationally expensive. Default is False. coef_box : str two-purpose parameter. Firstly, if the parameter equals None, the correlation coefficient and probability statistics will not be included in the graphic. Secondly, if the statistics are to be included, pass in the desired location on the graph which, in addition, will act as a pseudo-boolean parameter. The options are standard, i.e. 'upper left', 'lower right' and so forth. Default is None, i.e the statistics are not displayed. prob_est : str type of estimator for probability of significant relation. - "pool" (default) which means that all observation are treated as independent and calculates Pearson's correlation coefficient. - "map" denoting Macrosynergy panel test. This is based on a panel regression with period-specific random effects and greatly mitigates the issue of pseudo-replication if panel features and targets are correlated across time. See also https://research.macrosynergy.com/testing-macro-trading-factors/ - "kendall" which calculates the Kendall rank correlation coefficient. It is a non-parametric statistic used to measure the strength and direction of association between two ranked variables. separator : Union[str, int] allows categorizing the scatter analysis by cross-section or integer. In the former case the argument is set to "cids" and in the latter case the argument is set to a year [2010, for instance] which will subsequently split the time-period into the sample before (not including) that year and from (including) that year. title_adj : float parameter that sets top of figure to accommodate title. Default is 1. single_chart : bool boolean parameter determining whether the x- and y- labels are only written on a single graph of the Facet Grid (useful if there are numerous charts, and the labels are excessively long). The default is False, and the names of the axis will be displayed on each grid if not conflicting with the label for each variable. ncol : int number of columns in the facet grid. Default is None, in which case the number of columns is determined by the number of cross-sections. ax : plt.Axes Matplotlib Axes object. If None (default), new figure and axes objects will be created. If an Axes object is passed, the plot will be drawn on the Axes, and plt.show() will not be called. remove_zero_predictor : bool, default=False Remove zeros from the input series. """ coef_box_loc_error = ( "The parameter expects a string used to delimit the " "location of the box: 'upper left', 'lower right' etc." ) if coef_box is not None: assert isinstance(coef_box, str), coef_box_loc_error assert prob_est in [ "pool", "map", "kendall", ], "prob_est must be 'pool', 'kendall' or 'map'" if len(self.cids) == 1 and prob_est == "map": warnings.warn( "The 'map' estimator is not applicable to a single cross-section. " "Using 'pool' instead.", UserWarning, ) prob_est = "pool" sns.set_theme(style="whitegrid") dfx = self.df.copy() if title is None and (self.years is None): dates = ( self.df.index.get_level_values("real_date") .to_series() .dt.strftime("%Y-%m-%d") ) title = ( f"{self.xcats[0]} and {self.xcats[1]} " f"from {dates.min()} to {dates.max()}" ) elif title is None: title = f"{self.xcats[0]} and {self.xcats[1]}" if ax is not None: if not isinstance(ax, plt.Axes): raise TypeError("ax must be a matplotlib Axes object.") show_plot = False else: show_plot = True set_font_size = False if not (isinstance(coef_box_font_size, int) and coef_box_font_size >= 0): raise ValueError("coef_box_font_size must be a non-negative integer.") if coef_box_font_size == 0: set_font_size = True coef_box_font_size = 12 # If "separator" is type Integer, the scatter plot is split across two # time-periods where the divisor is the received year. if size is None: size = (3, 3) if separator == "cids" else (12, 8) else: if ( not isinstance(size, tuple) or len(size) != 2 or not all(isinstance(i, (int, float)) for i in size) ): raise TypeError("size must be a tuple of ints/floats.") if isinstance(separator, int): year_error = "Separation by years does not work with year groups." assert self.years is None, year_error if ax is None: fig, ax = plt.subplots(figsize=size) if "real_date" not in dfx.index.names: raise ValueError("`real_date` expected in index names.") if remove_zero_predictor: dfx = dfx[dfx.loc[:, self.xcats[0]] != 0] rdt_index = list(dfx.index.names).index("real_date") index_years = dfx.index.get_level_values(rdt_index).year years_in_df = list(index_years.unique()) assert separator in years_in_df, "Separator year is not in the range." error_sep = "Separator year must not be the first in the range." assert separator > np.min(years_in_df), error_sep label_set1 = f"before {separator}" label_set2 = f"from {separator}" dfx1 = dfx[index_years < separator] dfx2 = dfx[index_years >= separator] sns.regplot( data=dfx1, x=self.xcats[0], y=self.xcats[1], ci=reg_ci, order=reg_order, robust=reg_robust, fit_reg=fit_reg, scatter_kws={"s": 30, "alpha": 0.5}, label=label_set1, line_kws={"lw": 1}, ax=ax, ) sns.regplot( data=dfx2, x=self.xcats[0], y=self.xcats[1], ci=reg_ci, order=reg_order, robust=reg_robust, fit_reg=fit_reg, label=label_set2, scatter_kws={"s": 30, "alpha": 0.5}, line_kws={"lw": 1}, ax=ax, ) if coef_box is not None: data_table = self.corr_probability( df_probability=[dfx1, dfx2], time_period="", coef_box_loc=coef_box, prob_est=prob_est, ax=ax, ) x_scale = coef_box_size[0] y_scale = coef_box_size[1] data_table.scale(x_scale, y_scale) data_table.auto_set_font_size(set_font_size) data_table.set_fontsize(coef_box_font_size) ax.legend(loc="upper right") ax.set_title(title, fontsize=title_fontsize) if xlab is not None: ax.set_xlabel(xlab, fontsize=label_fontsize) if ylab is not None: ax.set_ylabel(ylab, fontsize=label_fontsize) elif separator == "cids" and not single_scatter: assert isinstance(single_chart, bool) dfx_copy = dfx.reset_index().rename(columns={"level_0": "cid"}) n_cids = len(dfx_copy["cid"].unique()) error_cids = ( "There must be more than one cross-section to use " "separator = 'cids'." ) assert n_cids > 1, error_cids # "Wrap" the column variable at this width, so that the column facets span # multiple rows. Used to determine the number of grids on each row. dict_coln = {2: 2, 5: 3, 8: 4, 30: 5} keys_ar = np.array(list(dict_coln.keys())) key = keys_ar[keys_ar <= n_cids][-1] if ncol is None: ncol = dict_coln[key] if ncol > n_cids: ncol = n_cids # The DataFrame is already a standardised DataFrame. Three columns: two # categories (dependent & explanatory variable) and the respective # cross-sections. The index will be the date timestamp. facet_height = size[1] # height of each facet in inches facet_aspect = size[0] / size[1] # aspect ratio of each facet fg = sns.FacetGrid( data=dfx_copy, col="cid", col_wrap=ncol, height=facet_height, aspect=facet_aspect, ) fg.map( sns.regplot, self.xcats[0], self.xcats[1], ci=reg_ci, order=reg_order, robust=reg_robust, fit_reg=fit_reg, scatter_kws={"s": 15, "alpha": 0.5, "color": "lightgray"}, line_kws={"lw": 1}, ) if coef_box is not None: fg.map_dataframe(self.annotate_facet) fg.set_titles(col_template="{col_name}") fg.fig.suptitle(title, y=title_adj, fontsize=14) if not single_chart: if xlab is not None: fg.set_xlabels(xlab, clear_inner=True) if ylab is not None: fg.set_ylabels(ylab) else: error = "Label expected for the respective axis." assert xlab is not None, error assert ylab is not None, error number_of_graphs = len(fg.axes) no_columns = fg._ncol remainder = int(number_of_graphs % no_columns) for i in range(number_of_graphs): fg.axes[i].set_xlabel("") fg.axes[i].set_ylabel("") if remainder == 0: fg.axes[no_columns - 1].set_xlabel(xlab) fg.axes[no_columns - 1].set_ylabel(ylab) else: fg.axes[-remainder].set_xlabel(xlab) fg.axes[-remainder].set_ylabel(ylab) elif separator == "cids" and single_scatter: assert isinstance(single_chart, bool) if ( coef_box == "upper right" ): # Since otherwise this overlaps with cid legend coef_box = "upper left" dfx_copy = dfx.reset_index() cids = dfx_copy["cid"].unique() n_cids = len(cids) error_cids = ( "There must be more than one cross-section to use " "separator = 'cids'." ) assert n_cids > 1, error_cids if ax is None: fig, ax = plt.subplots(figsize=size) # Perform a single global regression sns.regplot( data=dfx_copy, x=self.xcats[0], y=self.xcats[1], ci=reg_ci, order=reg_order, robust=reg_robust, fit_reg=fit_reg, scatter=False, # Do not plot scatter points in regplot line_kws={"lw": 1, "color": "black"}, ax=ax, ) # Color code the scatter points by cid for i, cid in enumerate(cids): dfx_i = dfx_copy[dfx_copy["cid"] == cid] ax.scatter( dfx_i[self.xcats[0]], dfx_i[self.xcats[1]], label=f"{cid}", s=30, alpha=0.5, ) if coef_box is not None: data_table = self.corr_probability( df_probability=dfx_copy, time_period="", coef_box_loc=coef_box, prob_est=prob_est, ax=ax, ) x_scale = coef_box_size[0] y_scale = coef_box_size[1] data_table.scale(x_scale, y_scale) data_table.auto_set_font_size(set_font_size) data_table.set_fontsize(coef_box_font_size) ax.legend(loc="upper right", title="Cids") ax.set_title(title, fontsize=title_fontsize) if xlab is not None: ax.set_xlabel(xlab, fontsize=label_fontsize) if ylab is not None: ax.set_ylabel(ylab, fontsize=label_fontsize) elif separator is None: if ax is None: fig, ax = plt.subplots(figsize=size) else: show_plot = False if remove_zero_predictor: dfx = dfx[dfx.loc[:, self.xcats[0]] != 0] sns.regplot( data=dfx, x=self.xcats[0], y=self.xcats[1], ci=reg_ci, order=reg_order, robust=reg_robust, fit_reg=fit_reg, scatter_kws={"s": 30, "alpha": 0.5, "color": "lightgray"}, line_kws={"lw": 1}, ax=ax, ) if coef_box is not None: data_table = self.corr_probability( df_probability=dfx, prob_est=prob_est, coef_box_loc=coef_box, ax=ax, ) x_scale = coef_box_size[0] y_scale = coef_box_size[1] data_table.scale(x_scale, y_scale) data_table.auto_set_font_size(set_font_size) data_table.set_fontsize(coef_box_font_size) if labels: error_freq = "Labels only available for monthly or lower frequencies." assert self.freq in ["A", "Q", "M"], error_freq df_labs = dfx.dropna().index.to_frame(index=False) if "cid" not in df_labs.columns: df_labs = df_labs.rename(columns={0: "cid"}) if self.years is not None: ser_labs = ( df_labs["cid"].astype("object") + " " + df_labs["real_date"] ) else: ser_labs = df_labs["cid"].astype("object") + " " ser_labs += df_labs["real_date"].dt.year.astype("string") if self.freq == "Q": ser_labs += "Q" + df_labs["real_date"].dt.quarter.astype( "string" ) elif self.freq == "M": ser_labs += "-" + df_labs["real_date"].dt.month.astype("string") for i in range(dfx.shape[0]): ax.text( x=dfx[self.xcats[0]][i] + 0, y=dfx[self.xcats[1]][i] + 0, s=ser_labs[i], fontdict=dict(color="black", size=8), ) ax.set_title(title, fontsize=title_fontsize) if xlab is not None: ax.set_xlabel(xlab, fontsize=label_fontsize) if ylab is not None: ax.set_ylabel(ylab, fontsize=label_fontsize) else: ValueError("Separator must be either a valid year <int> or 'cids' <str>.") if isinstance(ax, plt.Axes): ax.tick_params(axis="both", labelsize=tick_fontsize) plt.tight_layout() if show_plot: plt.show()
[docs] def ols_table(self, type="pool"): """ Print statsmodels regression summaries. Parameters ---------- type : str type of linear regression summary to print. Default is 'pool'. Alternative is 're' for period-specific random effects. """ assert type in ["pool", "re"], "Type must be either 'pool' or 're'." x, y = self.df.dropna().iloc[:, 0], self.df.dropna().iloc[:, 1] x_fit = sm.add_constant(x) groups = self.df.reset_index().real_date if type == "pool": fit_results = sm.OLS(y, x_fit).fit() elif type == "re": fit_results = sm.MixedLM(y, x_fit, groups).fit(reml=False) print(fit_results.summary())
if __name__ == "__main__": cids = ["AUD", "CAD", "GBP", "NZD", "USD"] xcats = ["XR", "CRY", "GROWTH", "INFL"] df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2000-01-01", "2020-12-31", 0.1, 1] df_cids.loc["CAD"] = ["2001-01-01", "2020-11-30", 0, 1] df_cids.loc["BRL"] = ["2001-01-01", "2020-11-30", -0.1, 2] df_cids.loc["GBP"] = ["2002-01-01", "2020-11-30", 0, 2] df_cids.loc["NZD"] = ["2002-01-01", "2020-09-30", -0.1, 2] df_cids.loc["USD"] = ["2003-01-01", "2020-12-31", -0.1, 2] cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"] df_xcats = pd.DataFrame(index=xcats, columns=cols) df_xcats.loc["XR"] = ["2000-01-01", "2020-12-31", 0.1, 1, 0, 0.3] df_xcats.loc["CRY"] = ["2000-01-01", "2020-10-30", 1, 2, 0.95, 1] df_xcats.loc["GROWTH"] = ["2001-01-01", "2020-10-30", 1, 2, 0.9, 1] df_xcats.loc["INFL"] = ["2001-01-01", "2020-10-30", 1, 2, 0.8, 0.5] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) dfd["grading"] = np.ones(dfd.shape[0]) black = {"AUD": ["2000-01-01", "2003-12-31"], "GBP": ["2018-01-01", "2100-01-01"]} # All AUD GROWTH locations. filt1 = (dfd["xcat"] == "GROWTH") & (dfd["cid"] == "AUD") filt2 = (dfd["xcat"] == "INFL") & (dfd["cid"] == "NZD") # Reduced DataFrame. dfdx = dfd[~(filt1 | filt2)].copy() dfdx["ERA"] = "before 2007" dfdx.loc[dfdx["real_date"].dt.year > 2007, "ERA"] = "from 2010" def modify_cry_values(group): if group.name[1] == "CRY": # Check if xcat is "cry" mask = np.ones(len(group), dtype=bool) mask[np.arange(len(group)) % 20 != 0] = False # Keep only every 20th row group.loc[~mask, "value"] = 0 # Set all other rows to zero return group dfdx = dfdx.groupby(["cid", "xcat"], group_keys=False).apply(modify_cry_values) cidx = ["AUD", "CAD", "GBP", "USD", "PRY"] cr = CategoryRelations( dfdx, xcats=["CRY", "XR"], freq="D", lag=1, cids=cidx, xcat_aggs=["mean", "sum"], start="2001-01-01", blacklist=black, years=None, ) cr.reg_scatter( labels=False, separator=None, title="Carry and Return", xlab="Carry", ylab="Return", coef_box="lower left", prob_est="map", remove_zero_predictor=True, title_fontsize=14, ) # years parameter cr = CategoryRelations( dfdx, xcats=["CRY", "XR"], freq="M", years=5, lag=0, cids=cidx, xcat_aggs=["mean", "sum"], start="2001-01-01", blacklist=black, ) cr.reg_scatter( labels=False, separator=None, title="Carry and Return, 5-year periods", xlab="Carry", ylab="Return", coef_box="lower left", prob_est="map", ) cr = CategoryRelations( dfdx, xcats=["CRY", "XR"], # xcat1_chg="diff", freq="M", lag=1, cids=cidx, xcat_aggs=["mean", "sum"], start="2001-01-01", blacklist=black, years=None, ) cr.reg_scatter( labels=False, separator=2010, title="Carry and Return", xlab="Carry", ylab="Return", coef_box="lower left", ncol=5, remove_zero_predictor=True ) cr.reg_scatter( labels=False, separator="cids", title="Composite macro trend pressure indicator and subsequent IRS fixed receiver returns for USD and EUR, since 2000", xlab="Composite macro trend pressure indicator", ylab="Next month's return on 2-year IRS return, vol-targeted position, %", coef_box="lower left", ncol=2, ) # Passing Axes object for a subplot fig, ax = plt.subplots(1, 2, figsize=(12, 8)) for i in range(2): cr.reg_scatter( labels=False, separator=None, title="Carry and Return", xlab="Carry", ylab="Return", coef_box="lower left", prob_est="kendall", ax=ax[i], ) plt.show() cr.ols_table(type="pool") cr.ols_table(type="re")