Source code for macrosynergy.panel.adjust_weights

"""
Implementation of adjust_weights.
"""

import numpy as np
import pandas as pd
from typing import List, Tuple, Callable, Dict, Any, Optional
import warnings
from numbers import Number
from macrosynergy.management.utils import reduce_df, get_cid
from macrosynergy.management.simulate import make_test_df
from macrosynergy.management.types import QuantamentalDataFrame
from macrosynergy.compat import PD_NEW_MAP

AVAILABLE_METHODS: List[str] = ["generic", "lincomb"]


[docs]def check_missing_cids_xcats(weights, adj_zns, cids, r_xcats, r_cids): """ Checks if there are missing cids or xcats in the input DataFrame. """ missing_xcats = list(set([weights, adj_zns]) - set(r_xcats)) if missing_xcats: raise ValueError(f"Missing xcats: {missing_xcats}") missing_cids = list(set(cids) - set(r_cids)) if missing_cids: raise ValueError(f"Missing cids: {missing_cids}")
[docs]def check_types( weights: str, adj_zns: str, method: str, adj_func: Callable, params: Dict[str, Any], cids: List[str], start: Optional[str] = None, end: Optional[str] = None, ): """ Type checking for the input variables of adjust_weights. """ for _var, _name, _type in [ (weights, "weights", str), (adj_zns, "adj_zns", str), (method, "method", str), (adj_func, "adj_func", (Callable, type(None))), (params, "param", dict), (cids, "cids", (list, type(None))), (start, "start", (str, type(None))), (end, "end", (str, type(None))), ]: if not isinstance(_var, _type): raise TypeError(f"{_name} must be a {_type}, not {type(_var)}") if cids is not None and ( not all(isinstance(cid, str) for cid in cids) or len(cids) == 0 ): raise TypeError("`cids` must be a None(default) or a non-empty list of strings") if method not in AVAILABLE_METHODS: raise ValueError( f"Method {method} not available. Available methods: {AVAILABLE_METHODS}" ) if method == "generic": if adj_func is None: raise ValueError("`adj_func` must be provided when method='generic'")
[docs]def lincomb_backend( df_adj_zns_wide: pd.DataFrame, df_weights_wide: pd.DataFrame, coeff_new: float, min_score: Optional[float] = None, ) -> pd.DataFrame: """ Linear combination of the parameters. Parameters ---------- df_adj_zns_wide : pd.DataFrame DataFrame with adjustment factors in wide format. df_weights_wide : pd.DataFrame DataFrame with weights in wide format. coeff_new : float Coefficient (between 0 and 1) for the new weights. 1 means the result consists entirely of the new weights, 0 means the result consists entirely of the old weights. min_score : float, optional Minimum score for the adjustment factors. Default is None, where it is set to the minimum score discovered in the panel of `df_adj_zns_wide`. """ assert set(df_weights_wide.columns) == set(df_adj_zns_wide.columns) assert set(df_weights_wide.index) == set(df_adj_zns_wide.index) if min_score is None: warnings.warn( "`min_score` not provided. Defaulting to minimum value from `df_adj_zns_wide`." ) min_score = df_adj_zns_wide.min().min() err_str = "Parameter `coeff_new` must be provided as a floating point number between 0 and 1." if not isinstance(coeff_new, Number) or ( isinstance(coeff_new, Number) and not 0 <= coeff_new <= 1 ): raise ValueError(err_str) # Algorithm: # new_weight_basis[i, t] = max(adj_zns[i, t] - min_score, 0) # new_weight[i, t] = new_weight_basis[i, t] / sum(new_weight_basis[t]) # output_raw_weight[i, t] = (1 - coeff_new) * old_weight[i, t] + coeff_new * new_weight[i, t] # output_weight[i, t] = output_raw_weight[i, t] / sum(output_raw_weight[i, t])) # where `i` is the cross-section and `t` is the date nwb = df_adj_zns_wide - min_score nwb[nwb < 0] = 0 nw = nwb.div(nwb.sum(axis="columns"), axis="index") orw = (1 - coeff_new) * df_weights_wide + coeff_new * nw ow = orw.div(orw.sum(axis="columns"), axis="index") return ow
[docs]def generic_weights_backend( df_weights_wide: pd.DataFrame, df_adj_zns_wide: pd.DataFrame, adj_func: Callable, params: Dict[str, Any] = {}, ) -> pd.DataFrame: """ Backend function for adjust_weights. Applies the `method` function to the weights and multiplies the result by the adjustment factors, and by the parameter `param`. Expects the input DataFrames to be in wide format, with the same columns AND index (see macrosynergy.panel.adjust_weights.split_weights_adj_zns). Parameters ---------- df_weights_wide : pd.DataFrame DataFrame with weights in wide format. df_adj_zns_wide : pd.DataFrame DataFrame with adjustment factors in wide format. method : Callable Function that will be applied to the weights to adjust them. params : Dict[str, Any], optional Parameters to be passed to the method function. Default is {}. Returns ------- pd.DataFrame DataFrame with the adjusted weights. """ assert set(df_weights_wide.columns) == set(df_adj_zns_wide.columns) assert set(df_weights_wide.index) == set(df_adj_zns_wide.index) if PD_NEW_MAP: dfw_result = df_weights_wide * df_adj_zns_wide.map(adj_func, **params) else: dfw_result = df_weights_wide * df_adj_zns_wide.applymap(adj_func, **params) return dfw_result
[docs]def split_weights_adj_zns( df: QuantamentalDataFrame, weights: str, adj_zns: str ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Splits the input DataFrame into two DataFrames, one containing the weights and the other containing the adjustment factors. Parameters ---------- df : QuantamentalDataFrame DataFrame containing the weights and adjustment factors. weights : str Name of the xcat to be used as weights. adj_zns : str Name of the z-n score xcat to be used as adjustment factors. Returns ------- Tuple[pd.DataFrame, pd.DataFrame] Tuple containing two wide DataFrames (one for weights and one for adjustment factors), with one column per cid. """ df_weights_wide = QuantamentalDataFrame(df.loc[df["xcat"] == weights]).to_wide() df_adj_zns_wide = QuantamentalDataFrame(df.loc[df["xcat"] == adj_zns]).to_wide() # cannot tolerate negative weights if any(df_weights_wide[~df_weights_wide.isna()].lt(0).any()): na_frame = QuantamentalDataFrame.from_wide( df_weights_wide[ df_weights_wide[~df_weights_wide.isna()].lt(0).any(axis="columns") ] ) na_frame = na_frame[na_frame["value"] < 0] raise ValueError( f"Negative weights found in the dataframe. Please check the following data:\n{na_frame}" ) combined_index = df_weights_wide.index.union(df_adj_zns_wide.index) df_weights_wide = df_weights_wide.reindex(combined_index) df_adj_zns_wide = df_adj_zns_wide.reindex(combined_index) df_weights_wide.columns = get_cid(df_weights_wide.columns) df_adj_zns_wide.columns = get_cid(df_adj_zns_wide.columns) zns_missing_in_weights = set(df_adj_zns_wide.columns) - set(df_weights_wide.columns) weights_missing_in_zns = set(df_weights_wide.columns) - set(df_adj_zns_wide.columns) zns_missing_in_weights = [f"{c}_{adj_zns}" for c in zns_missing_in_weights] weights_missing_in_zns = [f"{c}_{weights}" for c in weights_missing_in_zns] all_missing = zns_missing_in_weights + weights_missing_in_zns if all_missing: raise ValueError(f"Missing tickers: {all_missing}") # get the corresponding rows in zns nan_zns_rows = df_adj_zns_wide.isna().all(axis="columns") all_zero_zns_rows = (df_adj_zns_wide.fillna(0) == 0).all(axis="columns") missing_zns_dates = df_adj_zns_wide.index[nan_zns_rows | all_zero_zns_rows] nan_weights_rows = df_weights_wide.isna().all(axis="columns") all_zero_weights_rows = (df_weights_wide.fillna(0) == 0).all(axis="columns") missing_weights_dates = df_weights_wide.index[ nan_weights_rows | all_zero_weights_rows ] # if zn is missing, but weight is not missing, fill zn with 1 missing_zns_dates = sorted(set(missing_zns_dates) - set(missing_weights_dates)) if len(missing_zns_dates) > 0: estr = "Missing ZNs data (will be filled with 1 to preserve weights):" warnings.warn(f"{estr} {missing_zns_dates}") # replace missing zns data with standard weights df_adj_zns_wide.loc[missing_zns_dates] = 1 return df_weights_wide, df_adj_zns_wide
[docs]def normalize_weights( out_weights: pd.DataFrame, normalize_to_pct: bool = False ) -> pd.DataFrame: """ Output weights are normalized by dividing each row by the sum of the row. Function exists to allow easy modification of normalization method. Parameters ---------- out_weights : pd.DataFrame DataFrame with weights in wide format. (one column per cid) normalize_to_pct : bool, optional If True, the resulting weights will be scaled to 100%. Default is False. Returns ------- pd.DataFrame DataFrame with normalized weights (sum of each row is 1). """ out_weights = out_weights.div(out_weights.sum(axis="columns"), axis="index") norm_rows = out_weights.sum(axis="columns").apply(lambda x: np.isclose(x, 1)) all_nan_rows = out_weights.index[out_weights.isnull().all(axis="columns")] # assert that all rows sum to 1 or are all NaN if not norm_rows.all() and all_nan_rows.size == 0: raise Exception("Normalization failed; weights do not sum to 1") if normalize_to_pct: out_weights = out_weights * 100 return out_weights
[docs]def adjust_weights( df: QuantamentalDataFrame, weights_xcat: str, adj_zns_xcat: str, method: str = "generic", adj_func: Callable = None, params: Dict[str, Any] = {}, cids: List[str] = None, start: Optional[str] = None, end: Optional[str] = None, blacklist: Dict[str, Any] = None, normalize: bool = True, normalize_to_pct: bool = False, adj_name: str = "ADJWGT", ): """ Adjusts the weights of a given xcat by a given adjustment xcat using a given method. The resulting weights will be scaled to sum to 100% for each date. Parameters ---------- df : QuantamentalDataFrame QuantamentalDataFrame with weights and adjustment categories for all cross-sections. weights_xcat : str Name of the category containing the weights. adj_zns_xcat : str Name of the category containing the adjustment factors. method : Callable One of the available methods for adjusting weights. Default is "generic". See notes for available methods. adj_func : Callable, optional Function to be used for the adjustment when method is "generic". This function will be applied to the weights and multiplied by the adjustment factors. Default is None. params : Dict[str, Any], optional Parameters to be passed to the method function. Default is {}. cids : List[str], optional List of cross-sections to adjust. If None, all cross-sections will be adjusted. Default is None. start : str, optional Start date for the adjustment as YYYY-MM-DD. Default is None. end : str, optional End date for the adjustment as YYYY-MM-DD. Default is None. blacklist : Dict[str, Any], optional Blacklist dictionary passed to the reduce_df function. Default is None. See :meth:`macrosynergy.management.utils.df_utils.reduce_df` for more details. normalize : bool, optional If True, the resulting weights will be normalized to sum to one for each date for the entire list of cross-sections. Default is True. normalize_to_pct : bool, optional If True, the resulting weights will be scaled to 100%. Default is False. This only applies if `normalize` is True. adj_name : str, optional Name of the resulting xcat. Default is "ADJWGT". Returns ------- QuantamentalDataFrame DataFrame with the adjusted weights. Notes ----- Available methods: - "generic": Applies the method function to the weights and multiplies the result by the adjustment factors. The `method` function's signature must match: `method(weight: float, **params) -> float`. - "lincomb": Linear combination of the parameters. The method function must accept a single argument (the weight) and return a single value (the adjusted weight). The parameters `min_score` (minimum score for the adjustment factors) and `coeff_new` (coefficient for the new weights) must be provided in the `params` dictionary. See macrosynergy.panel.adjust_weights.lincomb_backend for more details. Examples -------- >>> df = make_test_df(xcats=["weights", "adj_zns"], cids=["cid1", "cid2", "cid3"]) >>> """ if not isinstance(df, QuantamentalDataFrame): raise TypeError("df must be a QuantamentalDataFrame") df: QuantamentalDataFrame = QuantamentalDataFrame(df) result_as_categorical: bool = df.InitializedAsCategorical check_types( weights=weights_xcat, adj_zns=adj_zns_xcat, method=method, adj_func=adj_func, params=params, cids=cids, start=start, end=end, ) df, r_xcats, r_cids = reduce_df( df, cids=cids, xcats=[weights_xcat, adj_zns_xcat], start=start, end=end, blacklist=blacklist, intersect=True, out_all=True, ) if cids is None: cids = df["cid"].unique().tolist() check_missing_cids_xcats(weights_xcat, adj_zns_xcat, cids, r_xcats, r_cids) df_weights_wide, df_adj_zns_wide = split_weights_adj_zns( df, weights_xcat, adj_zns_xcat ) # no need to normalize weights before applying the adjustment if method == "lincomb": dfw_result = lincomb_backend( df_adj_zns_wide=df_adj_zns_wide, df_weights_wide=df_weights_wide, coeff_new=params.get("coeff_new", None), min_score=params.get("min_score", None), ) elif method == "generic": dfw_result = generic_weights_backend( df_weights_wide=df_weights_wide, df_adj_zns_wide=df_adj_zns_wide, adj_func=adj_func, params=params, ) else: # this condition is covered in a check above raise ValueError(f"Method {method} not available.") # pragma: no cover all_nan_rows = dfw_result.index[dfw_result.isnull().all(axis="columns")] if all_nan_rows.size > 0: err = "The following dates have no data after applying the adjustment, and will be dropped:" warnings.warn(f"{err} {all_nan_rows}") dfw_result = dfw_result.dropna(how="all", axis="rows") if normalize: # normalize and scale to 100% dfw_result = normalize_weights(dfw_result, normalize_to_pct) if dfw_result.isna().all().all(): raise ValueError( "The resulting DataFrame is empty. Please check the input data," " the method function, and it's parameters." ) dfw_result.columns += f"_{adj_name}" qdf = QuantamentalDataFrame.from_wide(dfw_result, categorical=result_as_categorical) qdf = qdf.dropna(how="any", axis=0).reset_index(drop=True) return qdf
if __name__ == "__main__": df = make_test_df(xcats=["weights", "adj_zns"], cids=["cid1", "cid2", "cid3"]) dfb = make_test_df(xcats=["some_xcat", "other_xcat"], cids=["cid1", "cid2", "cid4"]) # nan_mask = np.random.rand(len(df)) < 0.01 # df.loc[nan_mask, "value"] = np.nan # nan_mask = np.random.rand(len(df)) < 0.1 # df.loc[nan_mask, "value"] *= -1 df = pd.concat([df, dfb], axis=0) # Using the lincomb method df_res = adjust_weights( df=df, weights_xcat="weights", adj_zns_xcat="adj_zns", method="lincomb", params={"min_score": None, "coeff_new": 0.5}, ) assert np.allclose(df_res.groupby("real_date")["value"].sum(), 1) # Using the generic method def sigmoid(x, amplitude=1.0, steepness=1.0, midpoint=0.0): """Sigmoid function with parameters for amplitude, steepness, and midpoint.""" return amplitude / (1 + np.exp(-steepness * (x - midpoint))) params = {"amplitude": 1, "steepness": 4, "midpoint": 1} df_res = adjust_weights( df=df, weights_xcat="weights", adj_zns_xcat="adj_zns", method="generic", adj_func=sigmoid, params=params, ) assert np.allclose(df_res.groupby("real_date")["value"].sum(), 1) print(df_res)