Source code for macrosynergy.management.utils.df_utils

"""
Utility functions for working with DataFrames.
"""

from macrosynergy.management.types import QuantamentalDataFrame
from macrosynergy.management.constants import FREQUENCY_MAP, FFILL_LIMITS, DAYS_PER_FREQ

import logging
import warnings
from typing import Iterable, List, Optional, Union, Dict
import re
from numbers import Number

import numpy as np
import pandas as pd
import datetime
import macrosynergy.management.constants as ms_constants
from macrosynergy.management.utils.core import (
    get_cid,
    get_xcat,
    _map_to_business_day_frequency,
    is_valid_iso_date,
)
from macrosynergy.compat import RESAMPLE_NUMERIC_ONLY, PD_OLD_RESAMPLE
import functools

logger = logging.getLogger(__name__)

IDX_COLS_SORT_ORDER = ["cid", "xcat", "real_date"]


[docs]def is_categorical_qdf(df: pd.DataFrame) -> bool: """ Check if a column in a DataFrame is categorical. Parameters ---------- df : pd.DataFrame The DataFrame to be checked. column : str The column to be checked. Returns ------- bool True if the column is categorical, False otherwise. """ if not isinstance(df, QuantamentalDataFrame): raise TypeError("Argument `df` must be a QuantamentalDataFrame.") return all([df[col].dtype.name == "category" for col in ["cid", "xcat"]])
[docs]def standardise_dataframe(df: pd.DataFrame) -> QuantamentalDataFrame: """ Applies the standard JPMaQS Quantamental DataFrame format to a DataFrame. Parameters ---------- df : pd.DataFrame The DataFrame to be standardized. Raises ------ TypeError If the input is not a pandas DataFrame. ValueError If the input DataFrame is not in the correct format. Returns ------- pd.DataFrame The standardized DataFrame. """ idx_cols: List[str] = QuantamentalDataFrame.IndexCols metric_columns: List[str] = ms_constants.JPMAQS_METRICS # Check if the input DF contains the standard columns if not set(df.columns).issuperset(set(idx_cols)): fail_str: str = ( f"Error : Tried to standardize DataFrame but failed." f"DataFrame not in the correct format. Please ensure " f"that the DataFrame has the following columns: " f"'cid', 'xcat', 'real_date', along with any other " "variables you wish to include (e.g. 'value', 'mop_lag', " "'eop_lag', 'grading')." ) try: dft: pd.DataFrame = df.reset_index() found_cols: list = dft.columns.tolist() fail_str += f"\nFound columns: {found_cols}" if not set(dft.columns).issuperset(set(idx_cols)): raise ValueError(fail_str) df = dft.copy() except: raise ValueError(fail_str) # check if there is at least one more column if len(df.columns) < 4: raise ValueError(fail_str) if isinstance(df, QuantamentalDataFrame) and type(df) is QuantamentalDataFrame: return QuantamentalDataFrame(df) # Convert date and ensure specific columns are strings in one step # 'datetime64[ns]' is the default dtype for datetime columns in pandas df["real_date"] = pd.to_datetime( df["real_date"], format="%Y-%m-%d", ).astype("datetime64[ns]") df["cid"] = df["cid"].astype("object") df["xcat"] = df["xcat"].astype("object") # sort by cid, xcat and real_date to allow viewing stacked timeseries easily df = ( df.drop_duplicates(subset=idx_cols, keep="last") .sort_values(by=IDX_COLS_SORT_ORDER) .reset_index(drop=True) ) # Sort the 'remaining' columns ## No more row-reordering or shape changes after this point jpmaqs_metrics = [mtr for mtr in metric_columns if mtr in df.columns] non_jpmaqs_metrics = (set(df.columns) - set(jpmaqs_metrics)) - set(idx_cols) col_order = idx_cols + jpmaqs_metrics + sorted(non_jpmaqs_metrics) df = df[col_order] # for every remaining col, try to convert to float for col in jpmaqs_metrics + list(non_jpmaqs_metrics): try: df[col] = df[col].astype(float) except: pass assert isinstance(df, QuantamentalDataFrame), "Failed to standardize DataFrame" return df
[docs]def drop_nan_series( df: pd.DataFrame, column: str = "value", raise_warning: bool = False ) -> QuantamentalDataFrame: """ Drops any series that are entirely NaNs. Raises a user warning if any series are dropped and the raise warning flag is set to true. Parameters ---------- df : pd.DataFrame The dataframe to be cleaned. column : str The column to be used as the value column, defaults to "value". raise_warning : bool Whether to raise a warning if any series are dropped. Raises ------ TypeError If the input is not a pandas DataFrame. ValueError If the input DataFrame is not in the correct format. Returns ------- pd.DataFrame | QuantamentalDataFrame The cleaned DataFrame. """ if not isinstance(df, QuantamentalDataFrame): raise TypeError("Argument `df` must be a Quantamental DataFrame.") if type(df) is QuantamentalDataFrame: return df.drop_nan_series(column=column, raise_warning=raise_warning) if not column in df.columns: raise ValueError(f"Column {column} not present in DataFrame.") if not df[column].isna().any(): return df if not isinstance(raise_warning, bool): raise TypeError("Error: The raise_warning argument must be a boolean.") df_orig: pd.DataFrame = df.copy() for cd, xc in df_orig.groupby(["cid", "xcat"], observed=True).groups: sel_series: pd.Series = df_orig[ (df_orig["cid"] == cd) & (df_orig["xcat"] == xc) ][column] if sel_series.isna().all(): if raise_warning: warnings.warn( message=f"The series {cd}_{xc} is populated " "with NaNs only, and will be dropped.", category=UserWarning, ) df = df[~((df["cid"] == cd) & (df["xcat"] == xc))] return df.reset_index(drop=True)
[docs]def qdf_to_ticker_df(df: pd.DataFrame, value_column: str = "value") -> pd.DataFrame: """ Converts a standardized JPMaQS DataFrame to a wide format DataFrame with each column representing a ticker. Parameters ---------- df : pd.DataFrame A standardised quantamental dataframe. value_column : str The column to be used as the value column, defaults to "value". If the specified column is not present in the DataFrame, a column named "value" will be used. If there is no column named "value", the first column in the DataFrame will be used instead. Returns ------- pd.DataFrame The converted DataFrame. """ if not isinstance(df, QuantamentalDataFrame): raise TypeError("Argument `df` must be a QuantamentalDataFrame.") if type(df) is QuantamentalDataFrame: return df.to_wide(value_column=value_column) if not isinstance(value_column, str): raise TypeError("Argument `value_column` must be a string.") if not value_column in df.columns: cols: List[str] = list(set(df.columns) - set(QuantamentalDataFrame.IndexCols)) if "value" in cols: value_column: str = "value" warnings.warn( f"Value column specified in `value_column` ({value_column}) " f"is not present in the DataFrame. Defaulting to {cols[0]}." ) value_column: str = cols[0] return ( df.assign(ticker=df["cid"] + "_" + df["xcat"]) .pivot(index="real_date", columns="ticker", values=value_column) .rename_axis(None, axis=1) # TODO why rename axis? )
[docs]def ticker_df_to_qdf(df: pd.DataFrame, metric: str = "value") -> QuantamentalDataFrame: """ Converts a wide format DataFrame (with each column representing a ticker) to a standardized JPMaQS DataFrame. Parameters ---------- df : pd.DataFrame A wide format DataFrame. Returns ------- pd.DataFrame The converted DataFrame. """ if not isinstance(df, pd.DataFrame): raise TypeError("Argument `df` must be a pandas DataFrame.") if not isinstance(metric, str): raise TypeError("Argument `metric` must be a string.") # pivot to long format df = ( df.stack(level=0).reset_index().rename(columns={0: metric, "level_1": "ticker"}) ) df["cid"] = get_cid(df["ticker"]) df["xcat"] = get_xcat(df["ticker"]) df = df.drop(columns=["ticker"]) return standardise_dataframe(df=df)
[docs]def concat_single_metric_qdfs( df_list: List[QuantamentalDataFrame], errors: str = "ignore", ) -> QuantamentalDataFrame: """ Combines a list of Quantamental DataFrames into a single DataFrame. Parameters ---------- df_list : List[QuantamentalDataFrame] A list of Quantamental DataFrames. errors : str The error handling method to use. If 'raise', then invalid items in the list will raise an error. If 'ignore', then invalid items will be ignored. Default is 'ignore'. Returns ------- QuantamentalDataFrame The combined DataFrame. """ if not isinstance(df_list, list): raise TypeError("Argument `df_list` must be a list.") if errors not in ["raise", "ignore"]: raise ValueError("`errors` must be one of 'raise' or 'ignore'.") if errors == "raise": if not all([isinstance(df, QuantamentalDataFrame) for df in df_list]): raise TypeError( "All elements in `df_list` must be Quantamental DataFrames." ) else: df_list = [df for df in df_list if isinstance(df, QuantamentalDataFrame)] if len(df_list) == 0: return None def _get_metric(df: QuantamentalDataFrame) -> str: lx = list(set(df.columns) - set(QuantamentalDataFrame.IndexCols)) if len(lx) != 1: raise ValueError( "Each QuantamentalDataFrame must have exactly one metric column." ) return lx[0] def _group_by_metric( dfl: List[QuantamentalDataFrame], fm: List[str] ) -> List[List[QuantamentalDataFrame]]: r = [[] for _ in range(len(fm))] while dfl: metric = _get_metric(df=dfl[0]) r[fm.index(metric)] += [dfl.pop(0)] return r found_metrics = list(set(map(_get_metric, df_list))) df_list = _group_by_metric(dfl=df_list, fm=found_metrics) # use pd.merge to join on QuantamentalDataFrame.IndexCols df: pd.DataFrame = functools.reduce( lambda left, right: pd.merge( left, right, on=["real_date", "cid", "xcat"], how="outer" ), map( lambda fm: pd.concat(df_list.pop(0), axis=0, ignore_index=False), found_metrics, ), ) return standardise_dataframe(df)
[docs]def apply_slip( df: QuantamentalDataFrame, slip: int, cids: Optional[List[str]] = None, xcats: Optional[List[str]] = None, tickers: Optional[List[str]] = None, metrics: List[str] = ["value"], extend_dates: bool = False, raise_error: bool = True, ) -> QuantamentalDataFrame: """ Applies a "slip" to the DataFrame for the given cross-sections and categories, on the given metrics. A slip shifts the specified category n-days fowards in time, where n is the slip value. This is identical to a lag, but is measured in days, and must always be applied before any resampling. Parameters ---------- target_df : QuantamentalDataFrame DataFrame to which the slip is applied. slip : int Slip to be applied. cids : List[str] List of cross-sections. xcats : List[str] List of target categories. metrics : List[str] List of metrics to which the slip is applied. extend_dates : bool If True, includes the dates added by the slip in the DataFrame. If False, only the input dates are included. Default is False. raise_error : bool If True, raises an error if the slip cannot be applied to all xcats in the target DataFrame. If False, raises a warning instead. Raises ------ TypeError If the provided parameters are not of the expected type. ValueError If the provided parameters are semantically incorrect. Returns ------- QuantamentalDataFrame DataFrame with the slip applied. """ if not isinstance(df, QuantamentalDataFrame): raise TypeError("Argument `df` must be a QuantamentalDataFrame.") df = df.copy() if not (isinstance(slip, int) and slip >= 0): raise ValueError("Slip must be a non-negative integer.") if not any([cids, xcats, tickers]): raise ValueError("One of `tickers`, `cids` or `xcats` must be provided.") if tickers is not None: if cids is not None or xcats is not None: raise ValueError("Cannot specify both `tickers` and `cids`/`xcats`.") if not isinstance(extend_dates, bool): raise TypeError("`extend_dates` must be a boolean.") if not isinstance(metrics, list) or ( isinstance(metrics, list) and not all(isinstance(m, str) for m in metrics) ): raise TypeError("`metrics` must be a list of strings.") missing_metrics = sorted(set(metrics) - set(df.columns)) if missing_metrics: raise ValueError(f"Metrics {missing_metrics} are not present in the DataFrame.") if cids is None: cids = df["cid"].unique() if xcats is None: xcats = df["xcat"].unique() if tickers is not None: sel_tickers = tickers else: sel_tickers: List[str] = [f"{cid}_{xcat}" for cid in cids for xcat in xcats] if is_categorical_qdf(df): df = QuantamentalDataFrame(df).add_ticker_column() else: df["ticker"] = df["cid"] + "_" + df["xcat"] err_str = ( "Tickers targetted for applying slip are not present in the DataFrame.\n" "Missing tickers: {tickers}" ) if not set(sel_tickers).issubset(set(df["ticker"].unique())): missing_tickers = sorted(list(set(sel_tickers) - set(df["ticker"].unique()))) _err_str = err_str.format(tickers=missing_tickers) if raise_error: raise ValueError(_err_str) else: warnings.warn(_err_str) if extend_dates: found_metrics = set(df.columns) - set(QuantamentalDataFrame.IndexCols) found_metrics = list(found_metrics - {"ticker"}) new_dfs: List[QuantamentalDataFrame] = [] for (ticker, cid, xcat), idx in df.groupby( ["ticker", "cid", "xcat"], observed=True ).groups.items(): last_date = df.loc[idx, "real_date"].max() new_dts = pd.bdate_range(start=last_date, periods=slip + 1)[1:] assert set(new_dts).isdisjoint(set(df.loc[idx, "real_date"].unique())) dct = {"real_date": new_dts, "cid": cid, "xcat": xcat, "ticker": ticker} dct = {**dct, **{metric: np.nan for metric in found_metrics}} new_dfs.append(pd.DataFrame(dct)) if is_categorical_qdf(df): new_df = QuantamentalDataFrame.from_qdf_list(new_dfs) else: new_df = pd.concat(new_dfs, axis=0, ignore_index=True) if is_categorical_qdf(df): df = QuantamentalDataFrame(df).update_df(df_add=new_df) else: df = pd.concat([df, new_df], axis=0, ignore_index=True) df = df.sort_values(by=["cid", "xcat", "real_date"]) for col in metrics: tks_isin = df["ticker"].isin(sel_tickers) df.loc[tks_isin, col] = df.groupby("ticker", observed=True)[col].shift(slip) df = df.drop(columns=["ticker"]).reset_index(drop=True) assert isinstance(df, QuantamentalDataFrame), "Failed to apply slip." return df
[docs]def downsample_df_on_real_date( df: pd.DataFrame, groupby_columns: List[str] = [], freq: str = "M", agg: str = "mean", ): """ Downsamples JPMaQS DataFrame. Parameters ---------- df : pd.Dataframe standardized JPMaQS DataFrame with the necessary columns: 'cid', 'xcat', 'real_date' and at least one column with values of interest. groupby_columns : List a list of columns used to group the DataFrame. freq : str frequency option. Per default the correlations are calculated based on the native frequency of the datetimes in 'real_date', which is business daily. Downsampling options include weekly ('W'), monthly ('M'), or quarterly ('Q') mean. agg : str aggregation method. Must be one of "mean" (default), "median", "min", "max", "first" or "last". Returns ------- pd.DataFrame the downsampled DataFrame. """ if not set(groupby_columns).issubset(df.columns): raise ValueError( "The columns specified in 'groupby_columns' were not found in the DataFrame." ) if not isinstance(freq, str): raise TypeError("`freq` must be a string") else: freq: str = _map_to_business_day_frequency(freq) if not isinstance(agg, str): raise TypeError("`agg` must be a string") else: agg: str = agg.lower() if agg not in ["mean", "median", "min", "max", "first", "last"]: raise ValueError( "`agg` must be one of 'mean', 'median', 'min', 'max', 'first', 'last'" ) non_groupby_columns = list(set(df.columns) - set(groupby_columns) - {"real_date"}) res = ( df.set_index("real_date") .groupby(groupby_columns, observed=True)[non_groupby_columns] .resample(freq) ) if PD_OLD_RESAMPLE: # pragma: no cover # resample only if the column is numeric res = res.agg( { col: agg for col in non_groupby_columns if pd.api.types.is_numeric_dtype(df[col]) } ).reset_index() res.columns = res.columns.droplevel(-1) else: res = res.agg(agg, **RESAMPLE_NUMERIC_ONLY).reset_index() return res
[docs]def update_df(df: pd.DataFrame, df_add: pd.DataFrame, xcat_replace: bool = False): """ Append a standard DataFrame to a standard base DataFrame with ticker replacement on the intersection. Parameters ---------- df : pd.DataFrame standardised base JPMaQS DataFrame with the following necessary columns: 'cid', 'xcat', 'real_date' and 'value'. df_add : pd.DataFrame another standardised JPMaQS DataFrame, with the latest values, to be added with the necessary columns: 'cid', 'xcat', 'real_date', and 'value'. Columns that are present in the base DataFrame but not in the appended DataFrame will be populated with NaN values. xcat_replace : bool all series belonging to the categories in the added DataFrame will be replaced, rather than just the added tickers. Returns ------- pd.DataFrame standardised DataFrame with the latest values of the modified or newly defined tickers added. ..note:: Tickers are combinations of cross-sections and categories. """ # index_cols = ["cid", "xcat", "real_date"] # Consider the other possible metrics that the DataFrame could be defined over df_cols = set(df.columns) df_add_cols = set(df_add.columns) error_message = f"The base DataFrame must be a Quantamental Dataframe." if not isinstance(df, QuantamentalDataFrame): raise TypeError(error_message) error_message = f"The added DataFrame must be a Quantamental Dataframe." if not isinstance(df_add, QuantamentalDataFrame): raise TypeError(error_message) if type(df) is QuantamentalDataFrame: return df.update_df(df_add=df_add, xcat_replace=xcat_replace) error_message = ( "The two Quantamental DataFrames must share at least " "four columns including 'real_date', 'cid', and 'xcat'." ) all_cols_set = df_cols.union(df_add_cols) if not len(all_cols_set - set(QuantamentalDataFrame.IndexCols)): raise ValueError(error_message) if not xcat_replace: df = update_tickers(df, df_add) else: df = update_categories(df, df_add) # sort same as in `standardise_dataframe` return df.sort_values(by=IDX_COLS_SORT_ORDER).reset_index(drop=True)
[docs]def update_tickers(df: pd.DataFrame, df_add: pd.DataFrame): """ Method used to update aggregate DataFrame on a ticker level. Parameters ---------- df : pd.DataFrame aggregate DataFrame used to store all tickers. df_add : pd.DataFrame DataFrame with the latest values. """ if not isinstance(df, QuantamentalDataFrame): raise TypeError("The base DataFrame must be a Quantamental Dataframe.") if not isinstance(df_add, QuantamentalDataFrame): raise TypeError("The added DataFrame must be a Quantamental Dataframe.") if df.empty: return df_add if df_add.empty: return df df = pd.concat([df, df_add], axis=0, ignore_index=True) df = df.drop_duplicates( subset=["real_date", "xcat", "cid"], keep="last" ).reset_index(drop=True) return df
[docs]def update_categories(df: pd.DataFrame, df_add): """ Method used to update the DataFrame on the category level. Parameters ---------- df : pd.DataFrame base DataFrame. df_add : pd.DataFrame appended DataFrame. """ incumbent_categories = list(df["xcat"].unique()) new_categories = list(df_add["xcat"].unique()) # Union of both category columns from the two DataFrames. append_condition = set(incumbent_categories) | set(new_categories) intersect = list(set(incumbent_categories).intersection(set(new_categories))) if len(append_condition) == len(incumbent_categories + new_categories): df = pd.concat([df, df_add], axis=0, ignore_index=True) # Shared categories plus any additional categories previously not defined in the base # DataFrame. else: df = df[~df["xcat"].isin(intersect)] df = pd.concat([df, df_add], axis=0, ignore_index=True) return df
[docs]def reduce_df( df: pd.DataFrame, xcats: Union[str, List[str]] = None, cids: List[str] = None, start: str = None, end: str = None, blacklist: dict = None, out_all: bool = False, intersect: bool = False, ): """ Filter DataFrame by xcats and cids and notify about missing xcats and cids. Parameters ---------- df : pd.Dataframe standardized JPMaQS DataFrame with the necessary columns: 'cid', 'xcat', 'real_date' and 'value'. xcats : Union[str, List[str]] extended categories to be filtered on. Default is all in the DataFrame. cids : List[str] cross sections to be checked on. Default is all in the dataframe. start : str string representing the earliest date. Default is None. end : str string representing the latest date. Default is None. blacklist : dict cross-sections with date ranges that should be excluded from the data frame. If one cross-section has several blacklist periods append numbers to the cross-section code. out_all : bool if True the function returns reduced dataframe and selected/ available xcats and cids. Default is False, i.e. only the DataFrame is returned intersect : bool if True only retains cids that are available for all xcats. Default is False. Returns ------- pd.Dataframe reduced DataFrame that also removes duplicates or (for out_all True) DataFrame and available and selected xcats and cids. """ if not isinstance(df, QuantamentalDataFrame): raise TypeError("Argument `df` must be a standardised Quantamental DataFrame.") if type(df) is QuantamentalDataFrame: return df.reduce_df( cids=cids, xcats=xcats, start=start, end=end, blacklist=blacklist, out_all=out_all, intersect=intersect, ) if xcats is not None: if isinstance(xcats, str): xcats = [xcats] df = df[df["xcat"].isin(xcats)] if cids is not None: cids = [cids] if isinstance(cids, str) else cids df = df[df["cid"].isin(cids)] if start: df = df[df["real_date"] >= pd.to_datetime(start)] if end: df = df[df["real_date"] <= pd.to_datetime(end)] if blacklist is not None: for key, value in blacklist.items(): df = df[ ~( (df["cid"] == key[:3]) & (df["real_date"] >= pd.to_datetime(value[0])) & (df["real_date"] <= pd.to_datetime(value[1])) ) ] if xcats is None: xcats = sorted(df["xcat"].unique()) else: xcats_in_df = df["xcat"].unique() xcats = [xcat for xcat in xcats if xcat in xcats_in_df] if intersect: cids_in_df = set.intersection( *(set(df[df["xcat"] == xcat]["cid"].unique()) for xcat in xcats) ) else: cids_in_df = df["cid"].unique() if cids is None: cids = sorted(cids_in_df) else: cids = [cid for cid in cids if cid in cids_in_df] df = df[df["cid"].isin(cids)] if out_all: return df.drop_duplicates(), xcats, sorted(cids) else: return df.drop_duplicates()
[docs]def reduce_df_by_ticker( df: pd.DataFrame, ticks: List[str] = None, start: str = None, end: str = None, blacklist: dict = None, ): """ Filter dataframe by xcats and cids and notify about missing xcats and cids Parameters ---------- df : pd.Dataframe standardized dataframe with the following columns: 'cid', 'xcat', 'real_date'. ticks : List[str] tickers (cross sections + base categories) start : str string in ISO 8601 representing earliest date. Default is None. end : str string ISO 8601 representing the latest date. Default is None. blacklist : dict cross sections with date ranges that should be excluded from the dataframe. If one cross section has several blacklist periods append numbers to the cross section code. Returns ------- pd.Dataframe reduced dataframe that also removes duplicates """ if type(df) is QuantamentalDataFrame: return df.reduce_df_by_ticker( tickers=ticks, start=start, end=end, blacklist=blacklist, ) dfx = df.copy() if start is not None: dfx = dfx[dfx["real_date"] >= pd.to_datetime(start)] if end is not None: dfx = dfx[dfx["real_date"] <= pd.to_datetime(end)] # Blacklisting by cross-section. if blacklist is not None: masks = [] for key, value in blacklist.items(): filt1 = dfx["cid"] == key[:3] filt2 = dfx["real_date"] >= pd.to_datetime(value[0]) filt3 = dfx["real_date"] <= pd.to_datetime(value[1]) combined_mask = filt1 & filt2 & filt3 masks.append(combined_mask) if masks: combined_mask = pd.concat(masks, axis=1).any(axis=1) dfx = dfx[~combined_mask] dfx["ticker"] = dfx["cid"] + "_" + dfx["xcat"] ticks_in_df = dfx["ticker"].unique() if ticks is None: ticks = sorted(ticks_in_df) else: ticks = [tick for tick in ticks if tick in ticks_in_df] dfx = dfx[dfx["ticker"].isin(ticks)] return dfx.drop_duplicates()
[docs]def categories_df_aggregation_helper(dfx: pd.DataFrame, xcat_agg: str): """ Helper method to down-sample each category in the DataFrame by aggregating over the intermediary dates according to a prescribed method. Parameters ---------- dfx : List[str] standardised DataFrame defined exclusively on a single category. xcat_agg : List[str] associated aggregation method for the respective category. """ dfx = dfx.groupby(["xcat", "cid", "custom_date"], observed=True) dfx = dfx.aggregate(xcat_agg, numeric_only=True).reset_index() if "real_date" in dfx.columns: dfx = dfx.drop(["real_date"], axis=1) dfx = dfx.rename(columns={"custom_date": "real_date"}) return dfx
def _categories_df_explanatory_df( dfw: pd.DataFrame, explanatory_xcats: List[str], agg_method: str, sum_condition: bool, lag: int, ): """ Produces the explanatory column(s) for the custom DataFrame. Parameters ---------- dfw : pd.DataFrame group-by DataFrame which has been down-sampled. The respective aggregation method will be applied. explanatory_xcats : List[str] list of explanatory category(s). agg_meth : str aggregation method used for all explanatory variables. sum_condition : dict required boolean to negate erroneous zeros if the aggregate method used, for the explanatory variable, is sum. lag : int lag of explanatory category(s). Applied uniformly to each category. """ dfw_explanatory = pd.DataFrame() for xcat in explanatory_xcats: if not sum_condition: explanatory_col = dfw[xcat].agg(agg_method).astype(dtype=np.float32) else: explanatory_col = dfw[xcat].sum(min_count=1) if lag > 0: explanatory_col: pd.Series explanatory_col = explanatory_col.groupby( level=0, observed=True, ).shift(lag) dfw_explanatory[xcat] = explanatory_col dfw_explanatory.index.names = ["cid", "real_date"] return dfw_explanatory
[docs]def categories_df( df: pd.DataFrame, xcats: List[str], cids: List[str] = None, val: str = "value", start: str = None, end: str = None, blacklist: dict = None, years: int = None, freq: str = "M", lag: int = 0, fwin: int = 1, xcat_aggs: List[str] = ["mean", "mean"], ): """ In principle, create custom two-categories DataFrame with appropriate frequency and, if applicable, lags. Parameters ---------- df : pd.Dataframe standardized JPMaQS DataFrame with the following necessary columns: 'cid', 'xcat', 'real_date' and at least one column with values of interest. xcats : List[str] extended categories involved in the custom DataFrame. The last category in the list represents the dependent variable, and the (n - 1) preceding categories will be the explanatory variables(s). cids : List[str] cross-sections to be included. Default is all in the DataFrame. val : str name of column that contains the values of interest. Default is 'value'. start : str earliest date in ISO 8601 format. Default is None, i.e. earliest date in DataFrame is used. end : str latest date in ISO 8601 format. Default is None, i.e. latest date in DataFrame is used. blacklist : dict cross-sections with date ranges that should be excluded from the DataFrame. If one cross section has several blacklist periods append numbers to the cross section code. years : int number of years over which data are aggregated. Supersedes the "freq" parameter and does not allow lags, Default is None, i.e. no multi-year aggregation. freq : str letter denoting frequency at which the series are to be sampled. This must be one of 'D', 'W', 'M', 'Q', 'A'. Default is 'M'. Will always be the last business day of the respective frequency. lag : int lag (delay of arrival) of explanatory category(s) in periods as set by freq. Default is 0. fwin : int forward moving average window of first category. Default is 1, i.e no average. Note: This parameter is used mainly for target returns as dependent variable. xcat_aggs : List[str] exactly two aggregation methods. Default is 'mean' for both. The same aggregation method, the first method in the parameter, will be used for all explanatory variables. Returns ------- pd.DataFrame custom DataFrame with category columns. N.B.: The number of explanatory categories that can be included is not restricted and will be appended column-wise to the returned DataFrame. The order of the DataFrame's columns will reflect the order of the categories list. """ freq = _map_to_business_day_frequency(freq) assert isinstance(xcats, list), f"<list> expected and not {type(xcats)}." assert all([isinstance(c, str) for c in xcats]), "List of categories expected." aggs_error = "List of strings, outlining the aggregation methods, expected." assert isinstance(xcat_aggs, list), aggs_error assert all([isinstance(a, str) for a in xcat_aggs]), aggs_error aggs_len = ( "Only two aggregation methods required. The first will be used for all " "explanatory category(s)." ) assert len(xcat_aggs) == 2, aggs_len assert not (years is not None) & ( lag != 0 ), "Lags cannot be applied to year groups." if years is not None: assert isinstance(start, str), "Year aggregation requires a start date." no_xcats = ( "If the data is aggregated over a multi-year timeframe, only two " "categories are permitted." ) assert len(xcats) == 2, no_xcats input_xcats = xcats input_cids = cids df, xcats, cids = reduce_df(df, xcats, cids, start, end, blacklist, out_all=True) if len(xcats) < 2: raise ValueError("The DataFrame must contain at least two categories. ") elif set(xcats) != set(input_xcats): missing_xcats = list(set(input_xcats) - set(xcats)) warnings.warn( f"The following categories are missing from the DataFrame: {missing_xcats}" ) if len(cids) < 1: raise ValueError( "The DataFrame must contain at least one valid cross section. " ) elif input_cids and set(cids) != set(input_cids): missing_cids = list(set(input_cids) - set(cids)) warnings.warn( f"The following cross sections are missing from the DataFrame: {missing_cids}" ) metric = ["value", "grading", "mop_lag", "eop_lag"] val_error = ( "The column of interest must be one of the defined JPMaQS metrics, " f"{metric}, but received {val}." ) assert val in metric, val_error avbl_cols = list(df.columns) assert val in avbl_cols, ( f"The passed column name, {val}, must be present in the " f"received DataFrame. DataFrame contains {avbl_cols}." ) # Reduce the columns in the DataFrame to the necessary columns: # ['cid', 'xcat', 'real_date'] + [val] (name of column that contains the # values of interest: "value", "grading", "mop_lag", "eop_lag"). col_names = ["cid", "xcat", "real_date", val] df_output = [] if years is None: dfw = df.pivot(index=("cid", "real_date"), columns="xcat", values=val) dep = xcats[-1] # The possibility of multiple explanatory variables. explanatory_xcats = xcats[:-1] dfw = dfw.groupby( [ pd.Grouper(level="cid"), pd.Grouper(level="real_date", freq=freq), ], observed=True, ) dfw_explanatory = _categories_df_explanatory_df( dfw=dfw, explanatory_xcats=explanatory_xcats, agg_method=xcat_aggs[0], sum_condition=(xcat_aggs[0] == "sum"), lag=lag, ) # Handles for falsified zeros. Following the frequency conversion, if the # aggregation method is set to "sum", time periods that exclusively contain NaN # values will incorrectly be summed to the value zero which is misleading for # analysis. if not (xcat_aggs[-1] == "sum"): dep_col = dfw[dep].agg(xcat_aggs[1]).astype(dtype=np.float32) else: dep_col = dfw[dep].sum(min_count=1) if fwin > 1: s = 1 - fwin dep_col = dep_col.rolling(window=fwin).mean().shift(s) dfw_explanatory[dep] = dep_col # Order such that the return category is the right-most column - will reflect the # order of the categories list. dfc = dfw_explanatory[explanatory_xcats + [dep]] else: start_year = pd.to_datetime(start).year end_year = df["real_date"].max().year + 1 grouping = int((end_year - start_year) / years) remainder = (end_year - start_year) % years year_groups = {} group_start_year = start_year for group in range(grouping): value = [i for i in range(group_start_year, group_start_year + years)] key = f"{group_start_year} - {group_start_year + (years - 1)}" year_groups[key] = value group_start_year += years v = [i for i in range(group_start_year, group_start_year + (remainder + 1))] year_groups[f"{group_start_year} - now"] = v list_y_groups = list(year_groups.keys()) translate_ = lambda year: list_y_groups[int((year % start_year) / years)] df["real_date"] = pd.to_datetime(df["real_date"], errors="coerce") df["custom_date"] = df["real_date"].dt.year.apply(translate_) dfx_list = [df[df["xcat"] == xcats[0]], df[df["xcat"] == xcats[1]]] df_agg = list(map(categories_df_aggregation_helper, dfx_list, xcat_aggs)) df_output.extend([d[col_names] for d in df_agg]) dfc = pd.concat(df_output) dfc = dfc.pivot(index=("cid", "real_date"), columns="xcat", values=val) if dfc.index.dtypes["cid"].name == "category": # in case the incoming DF has a categorical index it, the index needs to be # converted to object type to avoid issues downstream new_outer_index = dfc.index.levels[0].astype("object") new_index = pd.MultiIndex( levels=[new_outer_index, dfc.index.levels[1]], codes=dfc.index.codes, names=dfc.index.names, ) dfc.index = new_index # Adjusted to account for multiple signals requested. If the DataFrame is # two-dimensional, signal & a return, NaN values will be handled inside other # functionality, as categories_df() is simply a support function. If the parameter # how is set to "any", a potential unnecessary loss of data on certain categories # could arise. return dfc.dropna(axis=0, how="all")
[docs]def estimate_release_frequency( timeseries: Optional[pd.Series] = None, df_wide: Optional[pd.DataFrame] = None, atol: Optional[float] = None, rtol: Optional[float] = None, ) -> Union[Optional[str], Dict[str, Optional[str]]]: """ Estimates the release frequency of a timeseries, by inferring the frequency of the timeseries index. Before calling `pd.infer_freq`, the function drops NaNs, and rounds values as specified by the tolerance parameters to allow dropping of "duplicate" values. Parameters ---------- timeseries : pd.Series, optional The timeseries to be used to estimate the release frequency. Only one of `timeseries` or `df_wide` must be passed. df_wide : pd.DataFrame, optional The wide DataFrame to be used to estimate the release frequency. This mode processes each column of the DataFrame as a timeseries. Only one of `timeseries` or `df_wide` must be passed. atol : float, optional The absolute tolerance for the difference between two values. If `None`, no rounding is applied. rtol : float, optional The relative tolerance for the difference between two values. If `None`, no rounding is applied. Returns ------- str or dict The estimated release frequency. If `df_wide` is passed, a dictionary with the column names as keys and the estimated frequencies as values is returned. """ if df_wide is not None: if timeseries is not None: raise ValueError("Only one of `timeseries` or `df_wide` must be passed.") if not isinstance(df_wide, pd.DataFrame): raise TypeError("Argument `df_wide` must be a pandas DataFrame.") if df_wide.empty or df_wide.index.name != "real_date": raise ValueError( "Argument `df_wide` must be a non-empty pandas DataFrame with a datetime index `'real_date'`." ) return { col: estimate_release_frequency( timeseries=df_wide[col], atol=atol, rtol=rtol ) for col in df_wide.columns } if bool(atol) and bool(rtol): raise ValueError("Only one of `diff_atol` or `diff_rtol` must be passed.") if atol: if not isinstance(atol, Number) or atol <= 0: raise TypeError("Argument `diff_atol` must be a float.") elif rtol: if not isinstance(rtol, Number): raise TypeError("Argument `diff_rtol` must be a float.") if not (0 <= rtol <= 1): raise ValueError("Argument `diff_rtol` must be a float between 0 and 1.") for _arg, _name in zip([atol, rtol], ["atol", "rtol"]): if _arg is not None: if not isinstance(_arg, Number) or _arg < 0: raise TypeError( f"Argument `{_name}` must be a float greater than 0 or None." ) if rtol or atol: _scale = timeseries.abs().max() * rtol if rtol else atol _dp = -int(np.floor(np.log10(_scale))) timeseries: pd.Series = timeseries.round(_dp) timeseries: pd.Series = timeseries.dropna().drop_duplicates(keep="first") if ( not isinstance(timeseries, pd.Series) or timeseries.empty or not isinstance(timeseries.index, pd.DatetimeIndex) ): raise TypeError( "Argument `timeseries` must be a non-empty pandas Series with " "a datetime index `'real_date'`." ) return _determine_freq(timeseries.index.tolist())
def _determine_freq(dates: List[str]) -> str: """ Backend function to determine the frequency of a timeseries from the dates in the timeseries. Parameters ---------- dates : List[str] A list of dates in the timeseries. Returns ------- str The estimated frequency of the timeseries. One of 'D', 'W', 'M', 'Q', 'A'. """ dates: pd.DatetimeIndex = pd.to_datetime(sorted(dates)) deltas = dates.to_series().diff().dt.days[1:] frequencies = { "D": 1, "W": 7, "M": 30, "Q": 91, "A": 365, } closest_freq = deltas.map( lambda x: min(frequencies, key=lambda freq: abs(x - frequencies[freq])) ) return closest_freq.value_counts().idxmax()
[docs]def years_btwn_dates(start_date: pd.Timestamp, end_date: pd.Timestamp) -> int: """Returns the number of years between two dates.""" return end_date.year - start_date.year
[docs]def quarters_btwn_dates(start_date: pd.Timestamp, end_date: pd.Timestamp) -> int: """Returns the number of quarters between two dates.""" return (end_date.year - start_date.year) * 4 + ( end_date.quarter - start_date.quarter )
[docs]def months_btwn_dates(start_date: pd.Timestamp, end_date: pd.Timestamp) -> int: """Returns the number of months between two dates.""" return (end_date.year - start_date.year) * 12 + (end_date.month - start_date.month)
[docs]def weeks_btwn_dates(start_date: pd.Timestamp, end_date: pd.Timestamp) -> int: """Returns the number of business weeks between two dates.""" next_monday = start_date + pd.offsets.Week(weekday=0) dif = (end_date - next_monday).days // 7 + 1 return dif
def _get_edge_dates( dates: Optional[Union[pd.DatetimeIndex, pd.Series, Iterable[pd.Timestamp]]] = None, start_date: Optional[Union[str, pd.Timestamp]] = None, end_date: Optional[Union[str, pd.Timestamp]] = None, freq: str = "M", direction: str = "end", ) -> pd.Series: assert direction in ["start", "end"], "Direction must be either 'start' or 'end'." datettypes = [pd.Timestamp, str, np.datetime64, datetime.date] freq = _map_to_business_day_frequency(freq) if bool(start_date) != bool(end_date): raise ValueError( "Both `start_date` and `end_date` must be passed when using " "dates as a start and end date." ) if dates is not None: if not isinstance(dates, (pd.DatetimeIndex, pd.Series, Iterable)): raise TypeError( "Dates must be a pandas DatetimeIndex, Series, or a generic iterable." ) if isinstance(dates, pd.DataFrame): dates = dates.iloc[:, 0] if isinstance(dates, (pd.DatetimeIndex, pd.Series)): dates = dates.tolist() dates = list(dates) for ix, dt in enumerate(dates): try: dates[ix] = pd.to_datetime(dt) except Exception as e: raise TypeError( f"Error converting date at index {ix} to a pandas Timestamp: {e}" ) if bool(start_date) and bool(dates): raise ValueError( "Only one of `dates` or `start_date` and `end_date` must be passed." ) if bool(start_date): assert bool(end_date) for date, dname in zip([start_date, end_date], ["start_date", "end_date"]): if not isinstance(date, (str, pd.Timestamp)): raise TypeError(f"{dname} must be a string or a pandas Timestamp.") if isinstance(date, str): if not is_valid_iso_date(date): raise ValueError( "Both `start_date` and `end_date` must be valid ISO dates when passed as " "strings." ) if pd.Timestamp(start_date) > pd.Timestamp(end_date): start_date, end_date = end_date, start_date dts: pd.DataFrame = pd.DataFrame( ( dates if (dates is not None) else pd.bdate_range(start=start_date, end=end_date) ), columns=["real_date"], ).apply(pd.to_datetime, axis=1) min_date: pd.Timestamp = dts["real_date"].min() if freq == _map_to_business_day_frequency("D"): max_date = dts["real_date"].max() dtx = pd.bdate_range(start=min_date, end=max_date) return dtx[dtx.isin(dts["real_date"])] if freq == _map_to_business_day_frequency("M"): func = months_btwn_dates elif freq == _map_to_business_day_frequency("W"): func = weeks_btwn_dates elif freq == _map_to_business_day_frequency("Q"): func = quarters_btwn_dates elif freq == _map_to_business_day_frequency("A"): func = years_btwn_dates else: raise ValueError("Frequency parameter must be one of D, M, W, Q, or A.") dts["period"] = dts["real_date"].apply(func, args=(min_date,)) dx = -1 if direction == "end" else 1 t_indices: pd.Series = dts["period"].shift(dx) != dts["period"] t_dates: pd.Series = dts["real_date"].loc[t_indices].reset_index(drop=True) return t_dates
[docs]def get_eops( dates: Optional[Union[pd.DatetimeIndex, pd.Series, Iterable[pd.Timestamp]]] = None, start_date: Optional[Union[str, pd.Timestamp]] = None, end_date: Optional[Union[str, pd.Timestamp]] = None, freq: str = "M", ) -> pd.Series: """ Returns a series of end-of-period dates for a given frequency. Dates can be passed as a series, index, a generic iterable or as a start and end date. Parameters ---------- freq : str The frequency string. Must be one of "D", "W", "M", "Q", "A". dates : pd.DatetimeIndex | pd.Series | Iterable[pd.Timestamp] The dates to be used to generate the end-of-period dates. Can be passed as a series, index, a generic iterable or as a start and end date. start_date : str | pd.Timestamp The start date. Must be passed if dates is not passed. """ direction = "end" return _get_edge_dates( dates=dates, start_date=start_date, end_date=end_date, freq=freq, direction=direction, )
[docs]def merge_categories( df: pd.DataFrame, xcats: Optional[List[str]] = None, new_xcat: Optional[str] = None, cids: Optional[List[str]] = None, hierarchy: Optional[List[str]] = None, backfill: bool = False, start: Optional[str] = None, ): """ Merges categories into a new category, given a list of categories to be merged. The merging is done in a preferred order, i.e. the first category in the list will be the preferred value for each real_date and if the first category does not have a value for a given real_date, the next category in the list will be used, etc... Parameters ---------- df : pd.DataFrame standardized JPMaQS DataFrame with the columns 'cid', 'xcat', 'real_date' and 'value'. xcats : List[str] extended categories to be merged, in preferred order. Alias for `hierarchy`; provide one or the other. new_xcat : str name of the new category to be created. cids : List[str], optional cross sections to be included. Default is all in the DataFrame. hierarchy : List[str], optional alias for `xcats`. Provided for parity with the previous `extend_history` API. backfill : bool, optional If True, the new xcat is backfilled with its first valid value to the date specified by `start`. Default is False. start : str, optional ISO date. If `backfill` is True, the first valid value is propagated back to this date. If `backfill` is False and `start` is provided, the output is trimmed to dates >= start. Returns ------- pd.DataFrame DataFrame with the merged category. """ if not isinstance(df, pd.DataFrame): raise TypeError("The DataFrame must be a pandas DataFrame.") if hierarchy is not None and xcats is not None: raise ValueError("Provide either `xcats` or `hierarchy`, not both.") if hierarchy is not None: xcats = hierarchy if xcats is None: raise ValueError("`xcats` (or `hierarchy`) must be provided.") if not isinstance(xcats, list): raise TypeError("The categories must be a list of strings.") if not all(isinstance(xcat, str) for xcat in xcats): raise TypeError("The categories must be a list of strings.") if new_xcat is None: raise ValueError("`new_xcat` must be provided.") if not isinstance(new_xcat, str): raise TypeError("The new category must be a string.") if not isinstance(backfill, bool): raise TypeError("`backfill` must be a boolean.") if start is not None and not isinstance(start, str): raise TypeError("`start` must be a string.") if backfill and start is None: raise ValueError("`start` must be provided if `backfill` is True.") if not isinstance(df, QuantamentalDataFrame): raise TypeError("The DataFrame must be a Quantamental DataFrame.") qdf = QuantamentalDataFrame(df) result_as_categorical = qdf.InitializedAsCategorical if not set(xcats).issubset(qdf["xcat"].unique()): raise ValueError("The categories must be present in the DataFrame.") if cids is None: cids = list(qdf["cid"].unique()) if not isinstance(cids, list): raise TypeError("The cross sections must be a list of strings.") if not all(isinstance(cid, str) for cid in cids): raise TypeError("The cross sections must be a list of strings.") if not set(cids).issubset(qdf["cid"].unique()): raise ValueError("The cross sections must be present in the DataFrame.") start_ts = pd.to_datetime(start) if start is not None else None merged_results = [] for cid in cids: cid_df = qdf[qdf["cid"] == cid] merged_series = pd.DataFrame() for category in xcats: cat_df = cid_df[cid_df["xcat"] == category].sort_values("real_date") if merged_series.empty: merged_series = cat_df.copy() else: inferior_values = cat_df[ ~cat_df["real_date"].isin(merged_series["real_date"]) ] merged_series = pd.concat([merged_series, inferior_values]) merged_series = merged_series.sort_values("real_date") merged_series["xcat"] = new_xcat merged_series["cid"] = cid if backfill: valid = merged_series.dropna(subset=["value"]) if not valid.empty: first_valid_date = valid["real_date"].min() first_valid_value = valid.loc[ valid["real_date"] == first_valid_date, "value" ].iloc[0] if first_valid_date > start_ts: backfilled_data = pd.DataFrame( { "real_date": pd.bdate_range( start=start_ts, end=first_valid_date - pd.Timedelta(days=1), ), "value": first_valid_value, "cid": cid, "xcat": new_xcat, } ) merged_series = merged_series[ merged_series["real_date"] >= first_valid_date ] merged_series = pd.concat([backfilled_data, merged_series]) elif start_ts is not None: merged_series = merged_series[merged_series["real_date"] >= start_ts] merged_results.append(merged_series) result_df = pd.concat(merged_results, ignore_index=True).sort_values( by=IDX_COLS_SORT_ORDER ) return QuantamentalDataFrame(result_df, categorical=result_as_categorical)
[docs]def get_sops( dates: Optional[Union[pd.DatetimeIndex, pd.Series, Iterable[pd.Timestamp]]] = None, start_date: Optional[Union[str, pd.Timestamp]] = None, end_date: Optional[Union[str, pd.Timestamp]] = None, freq: str = "M", ) -> pd.Series: """ Returns a series of start-of-period dates for a given frequency. Dates can be passed as a series, index, a generic iterable or as a start and end date. Parameters ---------- freq : str The frequency string. Must be one of "D", "W", "M", "Q", "A". dates : pd.DatetimeIndex | pd.Series | Iterable[pd.Timestamp] The dates to be used to generate the start-of-period dates. Can be passed as a series, index, a generic iterable or as a start and end date. start_date : str | pd.Timestamp The start date. Must be passed if dates is not passed. """ direction = "start" return _get_edge_dates( dates=dates, start_date=start_date, end_date=end_date, freq=freq, direction=direction, )
[docs]def concat_categorical(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: """ Concatenate two DataFrames with categorical columns. The dtypes of the of the second DataFrame will be cast to the dtypes of the first. The columns of the DataFrames must be identical. Parameters ---------- df1 : pd.DataFrame The first DataFrame. df2 : pd.DataFrame The second DataFrame. Returns ------- pd.DataFrame The concatenated DataFrame with the same columns as the input. """ if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame): raise TypeError("Both DataFrames must be pandas DataFrames.") if not (set(df1.columns) == set(df2.columns)): raise ValueError("The columns of the two DataFrames must be identical.") # Explicitly set or create categorical columns based on the data in model_df_long for col in df1.select_dtypes(include="category").columns: df2[col] = df2[col].astype("category") non_categorical_cols = df1.select_dtypes(exclude="category").columns df2[non_categorical_cols] = df2[non_categorical_cols].astype( df1[non_categorical_cols].dtypes.to_dict() ) # If one DataFrame is None, return the other (if both are None, return None) if df1.empty: return df2.reset_index(drop=True) if df2 is not None else None if df2 is None or df2.empty: return df1.reset_index(drop=True) categorical_cols = list( set(df1.select_dtypes(include="category").columns).union( df2.select_dtypes(include="category").columns ) ) for col in categorical_cols: # Find the combined categories from both DataFrames for the current column combined_categories = pd.Categorical( df1[col].cat.categories.union(df2[col].cat.categories) ) # Re-assign the categorical column with the combined categories to both DataFrames df1[col] = pd.Categorical(df1[col], categories=combined_categories) df2[col] = pd.Categorical(df2[col], categories=combined_categories) # Concatenate the two DataFrames and reset the index concatenated_df = pd.concat([df1, df2], axis=0, ignore_index=True) return concatenated_df
def _insert_as_categorical(df, column_name, category_name, column_idx): """ Inserts a column into a dataframe as a categorical column. """ df.insert( column_idx, column_name, pd.Categorical( [category_name] * df.shape[0], categories=[category_name], ), ) return df
[docs]def forward_fill_wide_df(df, blacklist=None, n=1): """ Forward fills NaN values in a wide DataFrame using the last valid value in each column. It will not forward fill gaps in the data, only the next `n` periods after the last valid value. Parameters ---------- df : pd.DataFrame The DataFrame to be forward filled in `wide` format, where each column represents a cross-section and the index are dates. blacklist : dict, optional A dictionary where keys are column names and values are lists of two elements, representing the start and end dates of periods to be excluded from filling. n : int, optional The number of periods to fill forward. Default is 1, meaning only the next period """ if blacklist is None: blacklist = {} if not isinstance(blacklist, dict): raise TypeError("blacklist argument must be a dictionary.") if not isinstance(df, pd.DataFrame): raise TypeError("df must be a pandas DataFrame.") if not isinstance(n, int): raise ValueError("Parameter 'n' must be an integer.") for col in df.columns: series = df[col] last_valid_idx = series.last_valid_index() if last_valid_idx is None: continue last_pos = series.index.get_loc(last_valid_idx) fill_positions = range(last_pos + 1, min(last_pos + n + 1, len(series))) if not fill_positions: continue mask = pd.Series(False, index=series.index) mask.iloc[list(fill_positions)] = True blist = blacklist.get(col) if blist: start, end = pd.to_datetime(blist[0]), pd.to_datetime(blist[1]) blacklist_mask = series.index.to_series().between(start, end) mask &= ~blacklist_mask to_fill = mask & series.isna() df.loc[to_fill, col] = series.iloc[last_pos] return df
def _long_to_wide(df: pd.DataFrame, value_col: str) -> pd.DataFrame: """ Pivot a long-format panel to wide format (dates × cids). Parameters ---------- df : pd.DataFrame or QuantamentalDataFrame Long-format DataFrame with at least "real_date", "cid", and "value_col" columns. value_col : str Name of the column to use as cell values. Returns ------- pd.DataFrame Wide-format DataFrame indexed by "real_date" with one column per unique "cid". """ required_cols = {"real_date", "cid", value_col} missing = required_cols - set(df.columns) if missing: raise ValueError( f"_long_to_wide: DataFrame is missing required columns: {sorted(missing)}" ) wide = df.pivot(index="real_date", columns="cid", values=value_col) return wide def _wide_to_long(df: pd.DataFrame, value_name: str = "value") -> pd.DataFrame: """ Melt a wide-format panel back to long format, dropping NaN rows. Parameters ---------- df : pd.DataFrame Wide-format DataFrame with a "real_date"-named index and one column per cid. value_name : str, default "value" Name for the melted value column in the output. Returns ------- pd.DataFrame Long-format DataFrame with columns "real_date", "cid", and value_name, sorted by cid then real_date, with NaN rows dropped. """ if df.columns.empty: raise ValueError("_wide_to_long: DataFrame has no columns (expected one per cid).") long = ( df.rename_axis("real_date") .reset_index() .melt(id_vars="real_date", var_name="cid", value_name=value_name) .dropna(subset=[value_name]) .sort_values(["cid", "real_date"]) .reset_index(drop=True) ) return long
[docs]def rotate_cid_xcat( df: pd.DataFrame, direction: str, xcat_template: str, fixed_value: str, ) -> pd.DataFrame: """ Rotate a panel DataFrame between cid-per-row and xcat-per-row representations. Two directions are supported: - "to_xcats": for each row, replaces "cid" with a per-stock xcat derived from xcat_template (substituting the cid value into the "{cid}" placeholder) and sets "cid" to fixed_value. - "to_cids": the inverse — extracts the stock identifier from "xcat" using the template as a regex, writes it into "cid", and replaces "xcat" with fixed_value. Parameters ---------- df : pd.DataFrame or QuantamentalDataFrame Panel DataFrame with at least "cid" and "xcat" columns. direction : str Transformation direction: "to_xcats" or "to_cids". xcat_template : str Template string containing the placeholder "{cid}" that maps between a stock identifier and an xcat name, e.g. "EQXR_{cid}_NSA". fixed_value : str Value assigned to the column being collapsed. When direction is "to_xcats", all rows will have cid set to fixed_value; when direction is "to_cids", all rows will have xcat set to fixed_value. Returns ------- pd.DataFrame A copy of df with "cid" and "xcat" updated according to direction. Raises ------ ValueError If direction is not "to_xcats" or "to_cids". """ if direction not in ("to_xcats", "to_cids"): raise ValueError( f"direction must be 'to_xcats' or 'to_cids', got {direction!r}" ) if "{cid}" not in xcat_template: raise ValueError( f"xcat_template must contain the '{{cid}}' placeholder, got {xcat_template!r}" ) n_cids = df["cid"].nunique() n_xcats = df["xcat"].nunique() logger.debug( "rotate_cid_xcat called with direction=%r, n_cids=%d, n_xcats=%d", direction, n_cids, n_xcats, ) if min(n_cids, n_xcats) > 1: raise ValueError( f"Cannot rotate a panel with multiple cids ({n_cids}) and multiple " f"xcats ({n_xcats}). Exactly one of the two must be unique." ) dfa = df.copy() if direction == "to_xcats": dfa["xcat"] = dfa["cid"].apply(lambda x: xcat_template.replace("{cid}", x)) dfa["cid"] = fixed_value logger.debug( "Rotated %d cids to xcats using template %r; cid set to %r.", n_cids, xcat_template, fixed_value, ) else: pattern = "^" + re.escape(xcat_template).replace(r"\{cid\}", "(.+)") + "$" dfa["cid"] = dfa["xcat"].str.extract(pattern)[0] dfa["xcat"] = fixed_value logger.debug( "Rotated %d xcats to cids using template %r; xcat set to %r.", n_xcats, xcat_template, fixed_value, ) return dfa
if __name__ == "__main__": from macrosynergy.management.simulate import make_qdf cids = ["AUD", "CAD", "GBP", "NZD"] xcats = ["XR1", "XR2", "CRY1", "CRY2"] df_cids = pd.DataFrame( index=cids, columns=["earliest", "latest", "mean_add", "sd_mult"] ) df_cids.loc["AUD"] = ["2000-01-01", "2020-12-31", 0.1, 1] df_cids.loc["CAD"] = ["2001-01-01", "2020-11-30", 0, 1] df_cids.loc["GBP"] = ["2002-01-01", "2020-11-30", 0, 2] df_cids.loc["NZD"] = ["2002-01-01", "2020-09-30", -0.1, 2] df_xcats = pd.DataFrame( index=xcats, columns=["earliest", "latest", "mean_add", "sd_mult", "ar_coef", "back_coef"], ) df_xcats.loc["XR1"] = ["2000-01-01", "2020-12-31", 0.1, 1, 0, 0.3] df_xcats.loc["XR2"] = ["2000-01-01", "2020-10-30", 1, 2, 0.95, 1] df_xcats.loc["CRY1"] = ["2001-01-01", "2020-10-30", 1, 2, 0.9, 1] df_xcats.loc["CRY2"] = ["2001-01-01", "2020-10-30", 1, 2, 0.8, 0.5] dfd = make_qdf(df_cids, df_xcats, back_ar=0.75) dfw = categories_df( df=QuantamentalDataFrame(dfd), xcats=xcats, cids=cids, freq="M", # lag=1, xcat_aggs=["last", "sum"], # years=5, # start="2000-01-01", ) print("HI")