"""
Classes and functions for analyzing and visualizing the relations of two panel categories.
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List, Union, Tuple
from scipy import stats
import statsmodels.api as sm
import warnings
from macrosynergy.management.simulate import make_qdf
from macrosynergy.management.utils import categories_df
from macrosynergy.management.utils import apply_slip as apply_slip_util
from macrosynergy.management.types import QuantamentalDataFrame
[docs]class CategoryRelations(object):
"""
Class for analyzing and visualizing the relations of multiple panel categories.
Parameters
----------
df : ~pandas.DataFrame
standardized DataFrame with the necessary columns: 'cid', 'xcat', 'real_date'
and at least one column with values of interest.
xcats : List[str]
exactly two extended categories to be analyzed. If there is a hypothesized
explanatory-dependent relation, the first category is the explanatory variable and
the second category the explained variable.
cids : List[str]
cross-sections for which the category relations is being analyzed. Default is
all in the DataFrame.
start : str
earliest date in ISO format. Default is None in which case the earliest date in
the DataFrame will be used.
end : str
latest date in ISO format. Default is None in which case the latest date in the
DataFrame will be used.
blacklist : dict
cross-sections with date ranges that should be excluded from the analysis.
years : int
number of years over which data are aggregated. Supersedes the 'freq' parameter
and does not allow lags, Default is None, meaning no multi-year aggregation. Note:
for single year labelled plots, better use freq='A' for cleaner labels.
val : str
name of column that contains the values of interest. Default is 'value'.
freq : str
letter denoting frequency at which the series are to be sampled. This must be
one of 'D', 'W', 'M', 'Q', 'A'. Default is 'M'.
lag : int
lag (delay of arrival) of first (explanatory) category in periods as set by
freq. Default is 0. Importantly, for analyses with explanatory and dependent
categories, the first category takes the role of the explanatory and a positive lag
means that the explanatory values will be deferred into the future, i.e. relate to
future values of the explained variable.
xcat_aggs : List[str]
Exactly two aggregation methods. Default is 'mean' for both.
xcat1_chg : str
time series changes are applied to the first category. Default is None. Options
are 'diff' (first difference) and 'pch' (percentage change). The changes are
calculated over the number of periods determined by `n_periods`.
n_periods : int
number of periods over which changes of the first category have been calculated.
Default is 1.
fwin : int
forward moving average window of second category. Default is 1, i.e no average.
Importantly, for analysis with explanatory and dependent categories, the second
takes the role of the dependent and a forward window means that the dependent values
average forward into the future.
xcat_trims : List[float]
two-element list with maximum absolute values for the two respective categories.
Observations with higher values will be trimmed, i.e. removed from the analysis (not
winsorized!). Default is None for both. Trimming is applied after all other
transformations.
slip : int
number of periods to 'slip' the explanatory variable, i.e. the first category.
Here, slip mimics the late arrival of the data, or the time it takes
to act on the data. Default is 0.
"""
def __init__(
self,
df: pd.DataFrame,
xcats: List[str],
cids: List[str] = None,
val: str = "value",
start: str = None,
end: str = None,
blacklist: dict = None,
years: int = None,
freq: str = "M",
lag: int = 0,
fwin: int = 1,
xcat_aggs: List[str] = ["mean", "mean"],
xcat1_chg: str = None,
n_periods: int = 1,
xcat_trims: List[float] = [None, None],
slip: int = 0,
):
"""Initializes CategoryRelations"""
if not isinstance(freq, str):
raise TypeError("freq must be a string.")
self.xcats: List[str] = xcats
self.cids: List[str] = cids
self.val: str = val
self.freq: str = freq.upper()
self.lag: int = lag
self.years: int = years
self.aggs: List[str] = xcat_aggs
self.xcat1_chg: str = xcat1_chg
self.n_periods: int = n_periods
self.xcat_trims: List[float] = xcat_trims
self.slip: int = slip
if self.freq not in ["D", "W", "M", "Q", "A"]:
raise ValueError("freq must be one of 'D', 'W', 'M', 'Q', 'A'.")
if not isinstance(val, str):
raise TypeError("val must be a string.")
if not {"cid", "xcat", "real_date", val}.issubset(set(df.columns)):
raise ValueError(
f"`df` must have columns 'cid', 'xcat', 'real_date' and `{val}`."
)
df = QuantamentalDataFrame(df)
if not isinstance(xcats, (list, tuple)):
raise TypeError("`xcats` must be a list or a tuple.")
elif not len(xcats) == 2:
raise ValueError("`xcats` must have exactly two elements.")
if not isinstance(slip, int):
raise TypeError("`slip` must be a non-negative integer.")
elif slip < 0:
raise ValueError("`slip` must be a non-negative integer.")
if not isinstance(xcat_aggs, (list, tuple)):
raise TypeError("xcat_aggs must be a list or a tuple.")
# copy DF to avoid side-effects
df: pd.DataFrame = df.copy()
# Select the cross-sections available for both categories.
df.loc[:, "real_date"] = pd.to_datetime(df["real_date"], format="%Y-%m-%d")
if self.slip != 0:
metrics_found: List[str] = list(
set(df.columns) - set(["cid", "xcat", "real_date"])
)
# here, the slip is applied to the the first xcat (explanatory variable)
df = self.apply_slip(
df=df,
slip=self.slip,
cids=self.cids,
xcats=[self.xcats[0]],
metrics=metrics_found,
)
# capture warning from intersection_cids, in case the two categories do not
# share any cross-sections.
warnings_list = []
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
shared_cids = CategoryRelations.intersection_cids(df, xcats, cids)
for warning in w:
warnings_list.append(str(warning.message))
# if shared_cids is empty, then the analysis is not possible.
# The warning from intersection_cids now becomes an error.
if len(shared_cids) == 0:
error_message = "The two categories have no shared cross-sections."
if len(warnings_list) > 0:
error_message += f"\nPossible reason(s) for error: "
error_message += "\n".join(warnings_list)
error_message += "\nPlease check input parameters."
raise ValueError(error_message)
# Will potentially contain NaN values if the two categories are defined over
# time-periods.
df = categories_df(
df,
xcats,
shared_cids,
val=val,
start=start,
end=end,
freq=self.freq,
blacklist=blacklist,
years=years,
lag=lag,
fwin=fwin,
xcat_aggs=xcat_aggs,
)
if xcat1_chg is not None:
xcat1_error = (
"Change applied to the explanatory variable must either be "
"first-differencing, 'diff', or percentage change, 'pch'."
)
assert xcat1_chg in ["diff", "pch"], xcat1_error
n_periods_error = f"<int> expected and not {type(n_periods)}."
assert isinstance(n_periods, int), n_periods_error
df = CategoryRelations.time_series(
df,
change=xcat1_chg,
n_periods=n_periods,
shared_cids=shared_cids,
expln_var=xcats[0],
)
if any([xt is not None for xt in self.xcat_trims]):
xcat_trim_error = (
"Two values expected corresponding to the number " "of categories."
)
assert len(xcat_trims) == len(xcats), xcat_trim_error
types = [
isinstance(elem, (float, int)) and elem >= 0.0 for elem in xcat_trims
]
assert any(types), "Expected two floating point values."
df = CategoryRelations.outlier_trim(df, xcats, xcat_trims)
# NaN values will not be handled if both of the above conditions are not
# satisfied.
self.df = df.dropna(axis=0, how="any")
[docs] @classmethod
def intersection_cids(cls, df, xcats, cids):
"""
Returns common cross-sections across both categories and specified parameter.
Parameters
----------
df : ~pandas.DataFrame
standardised DataFrame.
xcats : List[str]
exactly two extended categories to be checked on.
cids : List[str]
cross-sections for which the category relation is being analyzed.
Returns
-------
List[str]
usable: List of the common cross-sections across the two categories.
"""
set_1 = set(df[df["xcat"] == xcats[0]]["cid"])
set_2 = set(df[df["xcat"] == xcats[1]]["cid"])
miss_1 = list(set(cids).difference(set_1))
miss_2 = list(set(cids).difference(set_2))
if len(miss_1) > 0:
print(f"{xcats[0]} misses: {sorted(miss_1)}.")
warnings.warn(f"{xcats[0]} misses: {sorted(miss_1)}.", UserWarning)
if len(miss_2) > 0:
print(f"{xcats[1]} misses: {sorted(miss_2)}.")
warnings.warn(f"{xcats[1]} misses: {sorted(miss_2)}.", UserWarning)
usable = list(set_1.intersection(set_2).intersection(set(cids)))
return usable
[docs] @staticmethod
def apply_slip(
df: pd.DataFrame,
slip: int,
cids: List[str],
xcats: List[str],
metrics: List[str],
) -> pd.DataFrame:
"""
Calls the utility function apply_slip_util defined in df_utils.
"""
return apply_slip_util(
df=df, slip=slip, cids=cids, xcats=xcats, metrics=metrics, raise_error=False
)
[docs] @classmethod
def time_series(
cls,
df: pd.DataFrame,
change: str,
n_periods: int,
shared_cids: List[str],
expln_var: str,
):
"""
Apply time-series changes to the explanatory variable. Calculates first
differences or percentage changes of the time series.
Parameters
----------
df : ~pandas.DataFrame
multi-indexed DataFrame hosting the two categories: first column represents
the explanatory variable; second column hosts the dependent variable. The
DataFrame's index is the real-date and cross-section.
change : str
type of change to be applied. Can be 'diff' for first-differencing or 'pch'
for percentage change.
n_periods : int
number of base periods in df over which the change is applied.
shared_cids : List[str]
shared cross-sections across the two categories and the received list.
expln_var : str
only the explanatory variable's data series will be changed from the raw
value series to a difference or percentage change value.
Returns
-------
~pandas.DataFrame
returns the same multi-indexed DataFrame but with an adjusted series
inline with the 'change' parameter.
"""
if change not in ["diff", "pch"]:
raise ValueError("change must be 'diff' or 'pch'.")
df_lists = []
for c in shared_cids:
temp_df: pd.DataFrame = df.loc[c].copy()
if change == "diff":
temp_df[expln_var] = temp_df[expln_var].diff(periods=n_periods)
elif change == "pch":
temp_df[expln_var] = temp_df[expln_var].pct_change(periods=n_periods)
temp_df["cid"] = c
temp_df = temp_df.set_index("cid", append=True)
df_lists.append(temp_df)
df_ = pd.concat(df_lists)
df_ = df_.dropna(axis=0, how="any")
return df_
[docs] @classmethod
def outlier_trim(cls, df: pd.DataFrame, xcats: List[str], xcat_trims: List[float]):
"""
Trim outliers from the dataset.
Parameters
----------
df : ~pandas.DataFrame
multi-indexed DataFrame hosting the two categories. The transformations, to
each series, have already been applied.
xcats : List[str]
explanatory and dependent variable.
xcat_trims : List[float]
two-element list with maximum absolute values for the two respective
categories. Observations with higher values will be trimmed, i.e. removed from
the analysis (not winsorized!).
Returns
-------
~pandas.DataFrame
returns the same multi-indexed DataFrame.
.. note::
Outliers are classified as any datapoint whose absolute value exceeds the
predefined value specified in the field self.xcat_trims. The values will be
set to NaN, and subsequently excluded from any regression modelling or
correlation coefficients.
"""
xcat_dict = dict(zip(xcats, xcat_trims))
for k, v in xcat_dict.items():
# if the trim value is None, then leave the series as is
if v is not None:
df[k] = np.where(np.abs(df[k]) < v, df[k], np.nan)
df = df.dropna(axis=0, how="any")
return df
[docs] def corr_prob_calc(
self, df_probability: Union[pd.DataFrame, List[pd.DataFrame]], prob_est: str
):
"""
Compute the correlation coefficient and probability statistics.
Parameters
----------
df_probability : List[~pandas.DataFrame] or ~pandas.DataFrame
pandas DataFrame containing the dependent and explanatory variables.
prob_est : str
type of estimator for probability of significant relation.
Returns
-------
List[tuple(float, float)]
.. note::
The method is able to handle multiple DataFrames, and will return the
corresponding number of statistics held inside a List.
"""
if isinstance(df_probability, pd.DataFrame):
df_probability = [df_probability]
cpl = []
for i, df_i in enumerate(df_probability):
feat = df_i[self.xcats[0]].to_numpy()
targ = df_i[self.xcats[1]].to_numpy()
coeff, pval = stats.pearsonr(feat, targ)
if prob_est == "kendall":
_, pval = stats.kendalltau(feat, targ)
if prob_est == "map":
X = df_i.loc[:, self.xcats[0]]
X = sm.add_constant(X)
y = df_i.loc[:, self.xcats[1]]
groups = df_i.reset_index().real_date
re = sm.MixedLM(y, X, groups).fit(reml=False) # random effects est
pval = float(re.summary().tables[1].iloc[1, 3])
row = [np.round(coeff, 3), np.round(1 - pval, 3)]
cpl.append(row)
return cpl
[docs] def corr_probability(
self,
df_probability: Union[pd.DataFrame, List[pd.DataFrame]],
prob_est: str,
time_period: str = "",
coef_box_loc: str = "upper left",
ax: plt.Axes = None,
):
"""
Add the computed correlation coefficient and probability to a Matplotlib table.
Parameters
----------
df_probability : List[~pandas.DataFrame] or ~pandas.DataFrame
pandas DataFrame containing the dependent and explanatory variables. Able to
handle multiple DataFrames representing different time-periods of the original
series.
prob_est : str
type of estimator for probability of significant relation.
time_period : str
indicator used to clarify which time-period the statistics are computed for.
For example, before 2010 and after 2010: the two periods experience very
different macroeconomic conditions. The default is an empty string.
coef_box_loc : str
location on the graph of the aforementioned box. The default is in the upper
left corner.
prob_bool : bool
boolean parameter which determines whether the probability value is included
in the table. The default is True.
ax : plt.Axes
Matplotlib Axes object. If None (default), new axes will be created.
"""
time_period_error = f"<str> expected - received {type(time_period)}."
assert isinstance(time_period, str), time_period_error
cpl = self.corr_prob_calc(df_probability=df_probability, prob_est=prob_est)
fields = [
f"Correlation\n coefficient {time_period}",
f"Probability\n of significance {time_period}",
]
if isinstance(df_probability, list) and len(df_probability) == 2:
row_headers = ["Before 2010", "After 2010"]
cellC = [
["lightsteelblue", "lightsteelblue"],
["lightsalmon", "lightsalmon"],
]
else:
row_headers = None
cellC = None
if ax is None:
data_table = plt.table(
cellText=cpl,
cellColours=cellC,
colLabels=fields,
cellLoc="center",
loc=coef_box_loc,
zorder=10,
)
else:
data_table = ax.table(
cellText=cpl,
cellColours=cellC,
colLabels=fields,
cellLoc="center",
loc=coef_box_loc,
zorder=10,
)
return data_table
[docs] def annotate_facet(self, data, **kws):
"""Annotate each graph within the facet grid."""
x = data[self.xcats[0]].to_numpy()
y = data[self.xcats[1]].to_numpy()
coeff, pval = stats.pearsonr(x, y)
cpl = np.round(coeff, 3)
fields = "Correlation coefficient: "
ax = plt.gca()
ax.text(0.04, 0.1, f"{fields} {cpl}", fontsize=10, transform=ax.transAxes)
[docs] def reg_scatter(
self,
title: str = None,
title_fontsize: int = 14,
labels: bool = False,
size: Tuple[float] = None,
xlab: str = None,
ylab: str = None,
label_fontsize: int = 12,
tick_fontsize: int = 12,
coef_box: str = None,
coef_box_size: Tuple[float] = (0.4, 2.5),
coef_box_font_size: int = 0,
prob_est: str = "pool",
fit_reg: bool = True,
reg_ci: int = 95,
reg_order: int = 1,
reg_robust: bool = False,
separator: Union[str, int] = None,
title_adj: float = 1,
single_chart: bool = False,
single_scatter: bool = False,
ncol: int = None,
ax: plt.Axes = None,
remove_zero_predictor: bool = False,
):
"""
Display scatter-plot and regression line.
Parameters
----------
title : str
title of plot. If None (default) an informative title is applied.
title_fontsize : int
font size of the title. Default is None.
labels : bool
assign a cross-section/period label to each dot. Default is False.
size : Tuple[float]
width and height of the figure
xlab : str
x-axis label. Default is no label.
ylab : str
y-axis label. Default is no label.
fit_reg : bool
if True (default) adds a regression line.
reg_ci : int
size of the confidence interval for the regression estimate. Default is 95.
Can be None.
reg_order : int
order of the regression equation. Default is 1 (linear).
reg_robust : bool
if this will de-weight outliers, which is computationally expensive. Default
is False.
coef_box : str
two-purpose parameter. Firstly, if the parameter equals None, the
correlation coefficient and probability statistics will not be included in the
graphic. Secondly, if the statistics are to be included, pass in the desired
location on the graph which, in addition, will act as a pseudo-boolean
parameter. The options are standard, i.e. 'upper left', 'lower right' and so
forth. Default is None, i.e the statistics are not displayed.
prob_est : str
type of estimator for probability of significant relation.
- "pool" (default)
which means that all observation are treated as
independent and calculates Pearson's correlation coefficient.
- "map"
denoting Macrosynergy panel test. This is based on a panel regression
with period-specific random effects and greatly mitigates the issue of
pseudo-replication if panel features and targets are correlated across
time.
See also https://research.macrosynergy.com/testing-macro-trading-factors/
- "kendall"
which calculates the Kendall rank correlation coefficient. It is
a non-parametric statistic used to measure the strength and direction of
association between two ranked variables.
separator : Union[str, int]
allows categorizing the scatter analysis by cross-section or integer. In the
former case the argument is set to "cids" and in the latter case the argument is
set to a year [2010, for instance] which will subsequently split the time-period
into the sample before (not including) that year and from (including) that year.
title_adj : float
parameter that sets top of figure to accommodate title. Default is 1.
single_chart : bool
boolean parameter determining whether the x- and y- labels are only written
on a single graph of the Facet Grid (useful if there are numerous charts, and
the labels are excessively long). The default is False, and the names of the
axis will be displayed on each grid if not conflicting with the label for each
variable.
ncol : int
number of columns in the facet grid. Default is None, in which case the
number of columns is determined by the number of cross-sections.
ax : plt.Axes
Matplotlib Axes object. If None (default), new figure and axes objects will
be created. If an Axes object is passed, the plot will be drawn on the Axes, and
plt.show() will not be called.
remove_zero_predictor : bool, default=False
Remove zeros from the input series.
"""
coef_box_loc_error = (
"The parameter expects a string used to delimit the "
"location of the box: 'upper left', 'lower right' etc."
)
if coef_box is not None:
assert isinstance(coef_box, str), coef_box_loc_error
assert prob_est in [
"pool",
"map",
"kendall",
], "prob_est must be 'pool', 'kendall' or 'map'"
if len(self.cids) == 1 and prob_est == "map":
warnings.warn(
"The 'map' estimator is not applicable to a single cross-section. "
"Using 'pool' instead.",
UserWarning,
)
prob_est = "pool"
sns.set_theme(style="whitegrid")
dfx = self.df.copy()
if title is None and (self.years is None):
dates = (
self.df.index.get_level_values("real_date")
.to_series()
.dt.strftime("%Y-%m-%d")
)
title = (
f"{self.xcats[0]} and {self.xcats[1]} "
f"from {dates.min()} to {dates.max()}"
)
elif title is None:
title = f"{self.xcats[0]} and {self.xcats[1]}"
if ax is not None:
if not isinstance(ax, plt.Axes):
raise TypeError("ax must be a matplotlib Axes object.")
show_plot = False
else:
show_plot = True
set_font_size = False
if not (isinstance(coef_box_font_size, int) and coef_box_font_size >= 0):
raise ValueError("coef_box_font_size must be a non-negative integer.")
if coef_box_font_size == 0:
set_font_size = True
coef_box_font_size = 12
# If "separator" is type Integer, the scatter plot is split across two
# time-periods where the divisor is the received year.
if size is None:
size = (3, 3) if separator == "cids" else (12, 8)
else:
if (
not isinstance(size, tuple)
or len(size) != 2
or not all(isinstance(i, (int, float)) for i in size)
):
raise TypeError("size must be a tuple of ints/floats.")
if isinstance(separator, int):
year_error = "Separation by years does not work with year groups."
assert self.years is None, year_error
if ax is None:
fig, ax = plt.subplots(figsize=size)
if "real_date" not in dfx.index.names:
raise ValueError("`real_date` expected in index names.")
if remove_zero_predictor:
dfx = dfx[dfx.loc[:, self.xcats[0]] != 0]
rdt_index = list(dfx.index.names).index("real_date")
index_years = dfx.index.get_level_values(rdt_index).year
years_in_df = list(index_years.unique())
assert separator in years_in_df, "Separator year is not in the range."
error_sep = "Separator year must not be the first in the range."
assert separator > np.min(years_in_df), error_sep
label_set1 = f"before {separator}"
label_set2 = f"from {separator}"
dfx1 = dfx[index_years < separator]
dfx2 = dfx[index_years >= separator]
sns.regplot(
data=dfx1,
x=self.xcats[0],
y=self.xcats[1],
ci=reg_ci,
order=reg_order,
robust=reg_robust,
fit_reg=fit_reg,
scatter_kws={"s": 30, "alpha": 0.5},
label=label_set1,
line_kws={"lw": 1},
ax=ax,
)
sns.regplot(
data=dfx2,
x=self.xcats[0],
y=self.xcats[1],
ci=reg_ci,
order=reg_order,
robust=reg_robust,
fit_reg=fit_reg,
label=label_set2,
scatter_kws={"s": 30, "alpha": 0.5},
line_kws={"lw": 1},
ax=ax,
)
if coef_box is not None:
data_table = self.corr_probability(
df_probability=[dfx1, dfx2],
time_period="",
coef_box_loc=coef_box,
prob_est=prob_est,
ax=ax,
)
x_scale = coef_box_size[0]
y_scale = coef_box_size[1]
data_table.scale(x_scale, y_scale)
data_table.auto_set_font_size(set_font_size)
data_table.set_fontsize(coef_box_font_size)
ax.legend(loc="upper right")
ax.set_title(title, fontsize=title_fontsize)
if xlab is not None:
ax.set_xlabel(xlab, fontsize=label_fontsize)
if ylab is not None:
ax.set_ylabel(ylab, fontsize=label_fontsize)
elif separator == "cids" and not single_scatter:
assert isinstance(single_chart, bool)
dfx_copy = dfx.reset_index().rename(columns={"level_0": "cid"})
n_cids = len(dfx_copy["cid"].unique())
error_cids = (
"There must be more than one cross-section to use "
"separator = 'cids'."
)
assert n_cids > 1, error_cids
# "Wrap" the column variable at this width, so that the column facets span
# multiple rows. Used to determine the number of grids on each row.
dict_coln = {2: 2, 5: 3, 8: 4, 30: 5}
keys_ar = np.array(list(dict_coln.keys()))
key = keys_ar[keys_ar <= n_cids][-1]
if ncol is None:
ncol = dict_coln[key]
if ncol > n_cids:
ncol = n_cids
# The DataFrame is already a standardised DataFrame. Three columns: two
# categories (dependent & explanatory variable) and the respective
# cross-sections. The index will be the date timestamp.
facet_height = size[1] # height of each facet in inches
facet_aspect = size[0] / size[1] # aspect ratio of each facet
fg = sns.FacetGrid(
data=dfx_copy,
col="cid",
col_wrap=ncol,
height=facet_height,
aspect=facet_aspect,
)
fg.map(
sns.regplot,
self.xcats[0],
self.xcats[1],
ci=reg_ci,
order=reg_order,
robust=reg_robust,
fit_reg=fit_reg,
scatter_kws={"s": 15, "alpha": 0.5, "color": "lightgray"},
line_kws={"lw": 1},
)
if coef_box is not None:
fg.map_dataframe(self.annotate_facet)
fg.set_titles(col_template="{col_name}")
fg.fig.suptitle(title, y=title_adj, fontsize=14)
if not single_chart:
if xlab is not None:
fg.set_xlabels(xlab, clear_inner=True)
if ylab is not None:
fg.set_ylabels(ylab)
else:
error = "Label expected for the respective axis."
assert xlab is not None, error
assert ylab is not None, error
number_of_graphs = len(fg.axes)
no_columns = fg._ncol
remainder = int(number_of_graphs % no_columns)
for i in range(number_of_graphs):
fg.axes[i].set_xlabel("")
fg.axes[i].set_ylabel("")
if remainder == 0:
fg.axes[no_columns - 1].set_xlabel(xlab)
fg.axes[no_columns - 1].set_ylabel(ylab)
else:
fg.axes[-remainder].set_xlabel(xlab)
fg.axes[-remainder].set_ylabel(ylab)
elif separator == "cids" and single_scatter:
assert isinstance(single_chart, bool)
if (
coef_box == "upper right"
): # Since otherwise this overlaps with cid legend
coef_box = "upper left"
dfx_copy = dfx.reset_index()
cids = dfx_copy["cid"].unique()
n_cids = len(cids)
error_cids = (
"There must be more than one cross-section to use "
"separator = 'cids'."
)
assert n_cids > 1, error_cids
if ax is None:
fig, ax = plt.subplots(figsize=size)
# Perform a single global regression
sns.regplot(
data=dfx_copy,
x=self.xcats[0],
y=self.xcats[1],
ci=reg_ci,
order=reg_order,
robust=reg_robust,
fit_reg=fit_reg,
scatter=False, # Do not plot scatter points in regplot
line_kws={"lw": 1, "color": "black"},
ax=ax,
)
# Color code the scatter points by cid
for i, cid in enumerate(cids):
dfx_i = dfx_copy[dfx_copy["cid"] == cid]
ax.scatter(
dfx_i[self.xcats[0]],
dfx_i[self.xcats[1]],
label=f"{cid}",
s=30,
alpha=0.5,
)
if coef_box is not None:
data_table = self.corr_probability(
df_probability=dfx_copy,
time_period="",
coef_box_loc=coef_box,
prob_est=prob_est,
ax=ax,
)
x_scale = coef_box_size[0]
y_scale = coef_box_size[1]
data_table.scale(x_scale, y_scale)
data_table.auto_set_font_size(set_font_size)
data_table.set_fontsize(coef_box_font_size)
ax.legend(loc="upper right", title="Cids")
ax.set_title(title, fontsize=title_fontsize)
if xlab is not None:
ax.set_xlabel(xlab, fontsize=label_fontsize)
if ylab is not None:
ax.set_ylabel(ylab, fontsize=label_fontsize)
elif separator is None:
if ax is None:
fig, ax = plt.subplots(figsize=size)
else:
show_plot = False
if remove_zero_predictor:
dfx = dfx[dfx.loc[:, self.xcats[0]] != 0]
sns.regplot(
data=dfx,
x=self.xcats[0],
y=self.xcats[1],
ci=reg_ci,
order=reg_order,
robust=reg_robust,
fit_reg=fit_reg,
scatter_kws={"s": 30, "alpha": 0.5, "color": "lightgray"},
line_kws={"lw": 1},
ax=ax,
)
if coef_box is not None:
data_table = self.corr_probability(
df_probability=dfx,
prob_est=prob_est,
coef_box_loc=coef_box,
ax=ax,
)
x_scale = coef_box_size[0]
y_scale = coef_box_size[1]
data_table.scale(x_scale, y_scale)
data_table.auto_set_font_size(set_font_size)
data_table.set_fontsize(coef_box_font_size)
if labels:
error_freq = "Labels only available for monthly or lower frequencies."
assert self.freq in ["A", "Q", "M"], error_freq
df_labs = dfx.dropna().index.to_frame(index=False)
if "cid" not in df_labs.columns:
df_labs = df_labs.rename(columns={0: "cid"})
if self.years is not None:
ser_labs = (
df_labs["cid"].astype("object") + " " + df_labs["real_date"]
)
else:
ser_labs = df_labs["cid"].astype("object") + " "
ser_labs += df_labs["real_date"].dt.year.astype("string")
if self.freq == "Q":
ser_labs += "Q" + df_labs["real_date"].dt.quarter.astype(
"string"
)
elif self.freq == "M":
ser_labs += "-" + df_labs["real_date"].dt.month.astype("string")
for i in range(dfx.shape[0]):
ax.text(
x=dfx[self.xcats[0]][i] + 0,
y=dfx[self.xcats[1]][i] + 0,
s=ser_labs[i],
fontdict=dict(color="black", size=8),
)
ax.set_title(title, fontsize=title_fontsize)
if xlab is not None:
ax.set_xlabel(xlab, fontsize=label_fontsize)
if ylab is not None:
ax.set_ylabel(ylab, fontsize=label_fontsize)
else:
ValueError("Separator must be either a valid year <int> or 'cids' <str>.")
if isinstance(ax, plt.Axes):
ax.tick_params(axis="both", labelsize=tick_fontsize)
plt.tight_layout()
if show_plot:
plt.show()
[docs] def ols_table(self, type="pool"):
"""
Print statsmodels regression summaries.
Parameters
----------
type : str
type of linear regression summary to print. Default is 'pool'. Alternative
is 're' for period-specific random effects.
"""
assert type in ["pool", "re"], "Type must be either 'pool' or 're'."
x, y = self.df.dropna().iloc[:, 0], self.df.dropna().iloc[:, 1]
x_fit = sm.add_constant(x)
groups = self.df.reset_index().real_date
if type == "pool":
fit_results = sm.OLS(y, x_fit).fit()
elif type == "re":
fit_results = sm.MixedLM(y, x_fit, groups).fit(reml=False)
print(fit_results.summary())
if __name__ == "__main__":
cids = ["AUD", "CAD", "GBP", "NZD", "USD"]
xcats = ["XR", "CRY", "GROWTH", "INFL"]
df_cids = pd.DataFrame(
index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"]
)
df_cids.loc["AUD"] = ["2000-01-01", "2020-12-31", 0.1, 1]
df_cids.loc["CAD"] = ["2001-01-01", "2020-11-30", 0, 1]
df_cids.loc["BRL"] = ["2001-01-01", "2020-11-30", -0.1, 2]
df_cids.loc["GBP"] = ["2002-01-01", "2020-11-30", 0, 2]
df_cids.loc["NZD"] = ["2002-01-01", "2020-09-30", -0.1, 2]
df_cids.loc["USD"] = ["2003-01-01", "2020-12-31", -0.1, 2]
cols = ["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"]
df_xcats = pd.DataFrame(index=xcats, columns=cols)
df_xcats.loc["XR"] = ["2000-01-01", "2020-12-31", 0.1, 1, 0, 0.3]
df_xcats.loc["CRY"] = ["2000-01-01", "2020-10-30", 1, 2, 0.95, 1]
df_xcats.loc["GROWTH"] = ["2001-01-01", "2020-10-30", 1, 2, 0.9, 1]
df_xcats.loc["INFL"] = ["2001-01-01", "2020-10-30", 1, 2, 0.8, 0.5]
dfd = make_qdf(df_cids, df_xcats, back_ar=0.75)
dfd["grading"] = np.ones(dfd.shape[0])
black = {"AUD": ["2000-01-01", "2003-12-31"], "GBP": ["2018-01-01", "2100-01-01"]}
# All AUD GROWTH locations.
filt1 = (dfd["xcat"] == "GROWTH") & (dfd["cid"] == "AUD")
filt2 = (dfd["xcat"] == "INFL") & (dfd["cid"] == "NZD")
# Reduced DataFrame.
dfdx = dfd[~(filt1 | filt2)].copy()
dfdx["ERA"] = "before 2007"
dfdx.loc[dfdx["real_date"].dt.year > 2007, "ERA"] = "from 2010"
def modify_cry_values(group):
if group.name[1] == "CRY": # Check if xcat is "cry"
mask = np.ones(len(group), dtype=bool)
mask[np.arange(len(group)) % 20 != 0] = False # Keep only every 20th row
group.loc[~mask, "value"] = 0 # Set all other rows to zero
return group
dfdx = dfdx.groupby(["cid", "xcat"], group_keys=False).apply(modify_cry_values)
cidx = ["AUD", "CAD", "GBP", "USD", "PRY"]
cr = CategoryRelations(
dfdx,
xcats=["CRY", "XR"],
freq="D",
lag=1,
cids=cidx,
xcat_aggs=["mean", "sum"],
start="2001-01-01",
blacklist=black,
years=None,
)
cr.reg_scatter(
labels=False,
separator=None,
title="Carry and Return",
xlab="Carry",
ylab="Return",
coef_box="lower left",
prob_est="map",
remove_zero_predictor=True,
title_fontsize=14,
)
# years parameter
cr = CategoryRelations(
dfdx,
xcats=["CRY", "XR"],
freq="M",
years=5,
lag=0,
cids=cidx,
xcat_aggs=["mean", "sum"],
start="2001-01-01",
blacklist=black,
)
cr.reg_scatter(
labels=False,
separator=None,
title="Carry and Return, 5-year periods",
xlab="Carry",
ylab="Return",
coef_box="lower left",
prob_est="map",
)
cr = CategoryRelations(
dfdx,
xcats=["CRY", "XR"],
# xcat1_chg="diff",
freq="M",
lag=1,
cids=cidx,
xcat_aggs=["mean", "sum"],
start="2001-01-01",
blacklist=black,
years=None,
)
cr.reg_scatter(
labels=False,
separator=2010,
title="Carry and Return",
xlab="Carry",
ylab="Return",
coef_box="lower left",
ncol=5,
remove_zero_predictor=True
)
cr.reg_scatter(
labels=False,
separator="cids",
title="Composite macro trend pressure indicator and subsequent IRS fixed receiver returns for USD and EUR, since 2000",
xlab="Composite macro trend pressure indicator",
ylab="Next month's return on 2-year IRS return, vol-targeted position, %",
coef_box="lower left",
ncol=2,
)
# Passing Axes object for a subplot
fig, ax = plt.subplots(1, 2, figsize=(12, 8))
for i in range(2):
cr.reg_scatter(
labels=False,
separator=None,
title="Carry and Return",
xlab="Carry",
ylab="Return",
coef_box="lower left",
prob_est="kendall",
ax=ax[i],
)
plt.show()
cr.ols_table(type="pool")
cr.ols_table(type="re")