"""
Module hosting custom types and meta-classes for use with Quantamental DataFrames.
"""
from typing import List, Optional, Any, Iterable, Mapping, Union, Dict, Set, Tuple
import pandas as pd
import numpy as np
import warnings
import itertools
import functools
from macrosynergy.management.constants import JPMAQS_METRICS
from .base import QuantamentalDataFrameBase
[docs]def get_col_sort_order(df: QuantamentalDataFrameBase) -> List[str]:
"""
Sort the columns of a QuantamentalDataFrame (in-place) in a consistent order.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to return the sorted columns of.
Returns
-------
List[str]
List of sorted column names.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
metric_cols: Set = set(df.columns) - set(QuantamentalDataFrameBase.IndexCols)
non_jpmaqs_metrics: List[str] = sorted(metric_cols - set(JPMAQS_METRICS))
jpmaqs_metrics: List[str] = [m for m in JPMAQS_METRICS if m in metric_cols]
return QuantamentalDataFrameBase.IndexCols + jpmaqs_metrics + non_jpmaqs_metrics
[docs]def qdf_to_categorical(
df: QuantamentalDataFrameBase,
) -> QuantamentalDataFrameBase:
"""
Convert the index columns ("cid", "xcat") of a DataFrame to categorical format.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to convert the index columns of.
Raises
------
TypeError
If `df` is not a QuantamentalDataFrame.
Returns
-------
QuantamentalDataFrame
DataFrame with the index columns converted to categorical format.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
df = change_column_format(df, QuantamentalDataFrameBase._StrIndexCols, "category")
return df
[docs]def qdf_to_string_index(
df: QuantamentalDataFrameBase,
) -> QuantamentalDataFrameBase:
"""
Convert the index columns ("cid", "xcat") of a DataFrame to string format.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to convert the index columns of.
Raises
------
TypeError
If `df` is not a QuantamentalDataFrame.
Returns
-------
QuantamentalDataFrame
DataFrame with the index columns converted to string format.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
df = change_column_format(df, QuantamentalDataFrameBase._StrIndexCols, "object")
return df
[docs]def check_is_categorical(df: QuantamentalDataFrameBase) -> bool:
"""
Check if the index columns of a DataFrame are categorical.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to check the index columns of.
Returns
-------
bool
True if the required index columns ("cid", "xcat") are categorical, False
otherwise.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
return all(
df[col].dtype.name == "category"
for col in QuantamentalDataFrameBase._StrIndexCols
)
def _get_tickers_series(
df: QuantamentalDataFrameBase,
cid_column: str = "cid",
xcat_column: str = "xcat",
) -> pd.Categorical:
"""
Get the list of tickers from the DataFrame.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to extract the tickers from.
cid_column : str, optional
Column name of the `cid` in the DataFrame. Default is "cid".
xcat_column : str, optional
Column name of the `xcat` in the DataFrame. Default is "xcat".
"""
# check if the columns are in the dataframe and are categorical
if cid_column not in df.columns:
raise ValueError(f"Column '{cid_column}' not found in DataFrame.")
if xcat_column not in df.columns:
raise ValueError(f"Column '{xcat_column}' not found in DataFrame.")
if not check_is_categorical(df):
return df[cid_column] + "_" + df[xcat_column]
cid_labels = df["cid"].cat.categories[df["cid"].cat.codes]
xcat_labels = df["xcat"].cat.categories[df["xcat"].cat.codes]
ticker_labels = [f"{cid}_{xcat}" for cid, xcat in zip(cid_labels, xcat_labels)]
categories = pd.unique(pd.Series(ticker_labels))
ticker_series = pd.Categorical(
ticker_labels,
categories=categories,
ordered=True,
)
return ticker_series
[docs]def apply_blacklist(
df: QuantamentalDataFrameBase,
blacklist: Mapping[str, Iterable[Union[str, pd.Timestamp]]],
) -> QuantamentalDataFrameBase:
"""
Apply a blacklist to a list of `cids` and `xcats`. The blacklisted data ranges are
removed from the DataFrame. This is useful for removing data that is known to be
incorrect or unreliable.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to apply the blacklist to.
blacklist : dict
Dictionary with keys as `cids` and values as a list of start and end dates
to blacklist. Example:
.. code-block:: python
{"cid": ["2020-01-01", "2020-12-31"]}
This can be extended to cover multiple periods for the same `cid` by appending
an additional label to the end of the `cid` key. Example:
.. code-block:: python
{
"usd_1": ["2020-01-01", "2020-12-31"],
"usd_2": ["2020-01-01", "2020-12-31"],
"eur": ["2020-01-01", "2020-12-31"],
}
Returns
-------
QuantamentalDataFrame
DataFrame with the blacklist applied.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a pandas DataFrame.")
if not isinstance(blacklist, dict):
raise TypeError("`blacklist` must be a dictionary.")
if not all([isinstance(k, str) for k in blacklist.keys()]):
raise TypeError("Keys of `blacklist` must be strings.")
if not all([isinstance(v, Iterable) for v in blacklist.values()]):
raise TypeError("Values of `blacklist` must be iterables.")
if not all(
[isinstance(vv, (str, pd.Timestamp)) for v in blacklist.values() for vv in v]
) or any([len(v) != 2 for v in blacklist.values()]):
raise TypeError(
"Values of `blacklist` must be lists of start & end dates (str or pd.Timestamp)."
)
for key, value in blacklist.items():
df = df[
~(
(df["cid"] == key[:3])
& (df["real_date"] >= value[0])
& (df["real_date"] <= value[1])
)
]
return df.reset_index(drop=True)
def _sync_df_categories(
df: QuantamentalDataFrameBase,
) -> QuantamentalDataFrameBase:
"""
Sync the categories of the DataFrame with the data.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to sync the categories of.
Returns
-------
QuantamentalDataFrame
DataFrame with the categories synced.
"""
if not check_is_categorical(df):
return df
df["cid"] = df["cid"].cat.remove_unused_categories().astype("category")
df["xcat"] = df["xcat"].cat.remove_unused_categories().astype("category")
return df
[docs]def reduce_df(
df: QuantamentalDataFrameBase,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
start: Optional[str] = None,
end: Optional[str] = None,
blacklist: dict = None,
out_all: bool = False,
intersect: bool = False,
) -> Union[
QuantamentalDataFrameBase, Tuple[QuantamentalDataFrameBase, List[str], List[str]]
]:
"""
Filter DataFrame by `cids`, `xcats`, and `start` & `end` dates.
Parameters
----------
df : QuantamentalDataFrameBase
The DataFrame to be filtered.
cids : Optional[List[str]], optional
List of `cid` values to filter by. If None, all `cid` values are included.
xcats : Optional[List[str]], optional
List of `xcat` values to filter by. If None, all `xcat` values are included.
start : Optional[str], optional
Start date for filtering. If None, no start date filtering is applied.
end : Optional[str], optional
End date for filtering. If None, no end date filtering is applied.
blacklist : dict, optional
Dictionary specifying blacklist criteria. If None, no blacklist filtering is
applied.
out_all : bool, optional
If True, returns the filtered DataFrame along with the lists of `xcats` and
`cids`; i.e. `(df, xcats, cids)`.
intersect : bool, optional
If True, only includes `cid` values that are present for all `xcat` values.
Returns
-------
Union[QuantamentalDataFrameBase, Tuple[QuantamentalDataFrameBase, List[str], List[str]]]
The filtered DataFrame. If `out_all` is True, also returns the lists of `xcats`
and `cids`.
"""
if xcats is not None:
if isinstance(xcats, str):
xcats = [xcats]
if start:
df = df[df["real_date"] >= pd.to_datetime(start)]
if end:
df = df[df["real_date"] <= pd.to_datetime(end)]
if blacklist is not None:
df = apply_blacklist(df, blacklist)
if xcats is None:
xcats = sorted(df["xcat"].unique())
else:
xcats_in_df = df["xcat"].unique()
xcats = [xcat for xcat in xcats if xcat in xcats_in_df]
df = df[df["xcat"].isin(xcats)]
if intersect:
cids_in_df = set.intersection(
*(set(df[df["xcat"] == xcat]["cid"].unique()) for xcat in xcats)
)
else:
cids_in_df = df["cid"].unique()
if cids is None:
cids = sorted(cids_in_df)
else:
cids = [cids] if isinstance(cids, str) else cids
cids = [cid for cid in cids if cid in cids_in_df]
df = df[df["cid"].isin(cids)].reset_index(drop=True)
df = _sync_df_categories(df)
df = df.drop_duplicates().reset_index(drop=True)
if out_all:
return df, xcats, sorted(cids)
else:
return df
[docs]def reduce_df_by_ticker(
df: QuantamentalDataFrameBase,
tickers: List[str],
start: Optional[str] = None,
end: Optional[str] = None,
blacklist: dict = None,
) -> QuantamentalDataFrameBase:
"""
Filters the given QuantamentalDataFrameBase based on tickers, date range, and
blacklist.
Parameters
----------
df : QuantamentalDataFrameBase
The DataFrame to be filtered.
tickers : List[str]
List of tickers to filter by.
start : Optional[str], optional
Start date for filtering. If None, no start date filtering is applied.
end : Optional[str], optional
End date for filtering. If None, no end date filtering is applied.
blacklist : dict, optional
Dictionary specifying blacklist criteria. If None, no blacklist filtering is
applied.
Raises
------
TypeError
If `df` is not a QuantamentalDataFrame.
Returns
-------
QuantamentalDataFrameBase
The filtered DataFrame.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
if not isinstance(tickers, list):
if tickers is not None:
raise TypeError("`tickers` must be a list of strings.")
if start is not None:
df = df.loc[df["real_date"] >= pd.to_datetime(start)]
if end is not None:
df = df.loc[df["real_date"] <= pd.to_datetime(end)]
if blacklist is not None:
df = apply_blacklist(df, blacklist)
ticker_series = _get_tickers_series(df)
if tickers is None:
tickers = sorted(ticker_series.unique())
df = df[ticker_series.isin(tickers)].reset_index(drop=True)
df = _sync_df_categories(df)
return df.drop_duplicates().reset_index(drop=True)
[docs]def update_df(
df: QuantamentalDataFrameBase,
df_add: QuantamentalDataFrameBase,
xcat_replace: bool = False,
) -> QuantamentalDataFrameBase:
"""
Append a standard DataFrame to a standard base DataFrame with ticker replacement on
the intersection.
Parameters
----------
df : QuantamentalDataFrame
Base DataFrame to append to.
df_add : QuantamentalDataFrame
DataFrame to append.
xcat_replace : bool, optional
If True, replace the xcats in the base DataFrame with the xcats in the DataFrame
to append. Default is False.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
if not isinstance(df_add, QuantamentalDataFrameBase):
raise TypeError("`df_add` must be a QuantamentalDataFrame.")
if not isinstance(xcat_replace, bool):
raise TypeError("`xcat_replace` must be a boolean.")
if xcat_replace:
df = update_categories(df=df, df_add=df_add)
else:
df = update_tickers(df=df, df_add=df_add)
_sortorder = QuantamentalDataFrameBase.IndexColsSortOrder
return df.sort_values(_sortorder).reset_index(drop=True)
[docs]def update_tickers(
df: pd.DataFrame,
df_add: pd.DataFrame,
) -> QuantamentalDataFrameBase:
"""
Method used to update aggregate DataFrame on the ticker level.
Parameters
----------
df : pd.DataFrame
DataFrame to update.
df_add : pd.DataFrame
DataFrame to add to the base DataFrame.
Returns
-------
QuantamentalDataFrame
Updated DataFrame.
"""
if df_add.empty:
return df
elif df.empty:
return df_add
if all(
_df[icol].dtype.name == "category"
for _df in [df, df_add]
for icol in ["cid", "xcat"]
):
union_cids = pd.api.types.union_categoricals(
[df["cid"].unique(), df_add["cid"].unique()]
)
union_xcats = pd.api.types.union_categoricals(
[df["xcat"].unique(), df_add["xcat"].unique()]
)
df["cid"] = pd.Categorical(df["cid"], categories=union_cids.categories)
df["xcat"] = pd.Categorical(df["xcat"], categories=union_xcats.categories)
df_add["cid"] = pd.Categorical(df_add["cid"], categories=union_cids.categories)
df_add["xcat"] = pd.Categorical(
df_add["xcat"], categories=union_xcats.categories
)
df = pd.concat([df, df_add], axis=0, ignore_index=True)
df = df.drop_duplicates(
subset=QuantamentalDataFrameBase.IndexCols,
keep="last",
).reset_index(drop=True)
return df
[docs]def update_categories(
df: QuantamentalDataFrameBase,
df_add: QuantamentalDataFrameBase,
) -> QuantamentalDataFrameBase:
"""
Method used to update the DataFrame on the category level.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
if not isinstance(df_add, QuantamentalDataFrameBase):
raise TypeError("`df_add` must be a QuantamentalDataFrame.")
incumbent_categories = list(df["xcat"].unique())
new_categories = list(df_add["xcat"].unique())
append_condition = set(incumbent_categories) | set(new_categories)
intersect = list(set(incumbent_categories).intersection(set(new_categories)))
if all(
_df[icol].dtype.name == "category"
for _df in [df, df_add]
for icol in ["cid", "xcat"]
):
union_cids = pd.api.types.union_categoricals(
[df["cid"].unique(), df_add["cid"].unique()]
)
union_xcats = pd.api.types.union_categoricals(
[df["xcat"].unique(), df_add["xcat"].unique()]
)
df["cid"] = pd.Categorical(df["cid"], categories=union_cids.categories)
df["xcat"] = pd.Categorical(df["xcat"], categories=union_xcats.categories)
df_add["cid"] = pd.Categorical(df_add["cid"], categories=union_cids.categories)
df_add["xcat"] = pd.Categorical(
df_add["xcat"], categories=union_xcats.categories
)
if len(append_condition) == len(incumbent_categories + new_categories):
df = pd.concat([df, df_add], axis=0, ignore_index=True)
else:
df = df[~df["xcat"].isin(intersect)]
df = pd.concat([df, df_add], axis=0, ignore_index=True)
_sortorder = QuantamentalDataFrameBase.IndexColsSortOrder
return df.sort_values(_sortorder).reset_index(drop=True)
[docs]def qdf_to_wide_df(
df: QuantamentalDataFrameBase,
value_column: str = "value",
) -> pd.DataFrame:
"""
Pivot the DataFrame to a wide format with memory efficiency.
"""
# Ensure inputs are of the correct type and exist in the DataFrame
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
if not isinstance(value_column, str):
raise TypeError("`value_column` must be a string.")
if value_column not in df.columns:
raise ValueError(f"Column '{value_column}' not found in DataFrame.")
df["ticker"] = _get_tickers_series(df)
# Perform the pivot directly within the assignment to reduce memory footprint
return df.pivot(
index="real_date", columns="ticker", values=value_column
).rename_axis(None, axis=1)
[docs]def add_ticker_column(
df: QuantamentalDataFrameBase,
) -> List[str]:
"""
Get the list of tickers from the DataFrame.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to extract the tickers from.
Raises
------
TypeError
If `df` is not a QuantamentalDataFrame.
Returns
-------
List[str]
List of tickers.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a pandas DataFrame.")
df["ticker"] = _get_tickers_series(df)
return df
def _add_categorical_column(
df: pd.DataFrame,
column_name: str,
fill_value: str,
) -> pd.DataFrame:
"""
Add a categorical index column to a DataFrame. Typically `cid` or `xcat`.
Parameters
----------
df : pd.DataFrame
DataFrame to add the index column to.
column_name : str
Name of the index column to add.
"""
df[column_name] = pd.Categorical.from_codes([0] * len(df), categories=[fill_value])
return df
[docs]def rename_xcats(
df: QuantamentalDataFrameBase,
xcat_map: Optional[Dict[str, str]] = None,
select_xcats: Optional[List[str]] = None,
postfix: Optional[str] = None,
prefix: Optional[str] = None,
name_all: Optional[str] = None,
fmt_string: Optional[str] = None,
) -> QuantamentalDataFrameBase:
"""
Rename the xcats in a DataFrame based on a mapping or a format string. Only one of
`xcat_map` or `select_xcats` must be provided. If `name_all` is provided, all xcats
will be renamed to this value.
NOTE: This function maintains the datatype of the xcat column as a categorical.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to rename the xcats in.
xcat_map : dict, optional
Dictionary mapping the old xcats to new xcats. Default is None.
select_xcats : List[str], optional
List of xcats to rename. Default is None.
postfix : str, optional
Postfix to add to the xcats. Default is None.
prefix : str, optional
Prefix to add to the xcats. Default is None.
name_all : str, optional
Name to rename all xcats to. Default is None.
fmt_string : str, optional
Format string to rename xcats. Default is None.
Raises
------
TypeError
If `df` is not a QuantamentalDataFrame.
ValueError
If both `xcat_map` and `select_xcats` are provided.
TypeError
If `xcat_map` is not a dictionary with string keys and values.
ValueError
If `postfix`, `prefix`, `name_all`, or `fmt_string` are not provided.
ValueError
If `fmt_string` does not contain exactly one pair of curly braces.
Returns
-------
QuantamentalDataFrame
DataFrame with the xcats renamed.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
if bool(xcat_map) and bool(select_xcats):
raise ValueError("Only one of `xcat_map` or `select_xcats` must be provided.")
# Validate `xcat_map`
if xcat_map is not None:
if not (
isinstance(xcat_map, dict)
and all(
isinstance(k, str) and isinstance(v, str) for k, v in xcat_map.items()
)
):
raise TypeError(
"`xcat_map` must be a dictionary with string keys and values."
)
# Rename xcats based on `xcat_map`
df["xcat"] = df["xcat"].cat.rename_categories(
{old_cat: xcat_map.get(old_cat, old_cat) for old_cat in df["xcat"].unique()}
)
return df
if select_xcats is None:
select_xcats = df["xcat"].unique()
# Ensure exactly one of postfix, prefix, name_all, or fmt_string is provided
if not (bool(postfix) ^ bool(prefix) ^ bool(name_all) ^ bool(fmt_string)):
raise ValueError(
"Exactly one of `postfix`, `prefix`, `name_all`, or `fmt_string` must be provided."
)
funcs = {
"postfix": lambda x: f"{x}{postfix}",
"prefix": lambda x: f"{prefix}{x}",
"name_all": lambda x: name_all,
"fmt_string": lambda x: fmt_string.format(x),
}
curr_func = None
for var_, name_ in zip(
[postfix, prefix, name_all, fmt_string],
["postfix", "prefix", "name_all", "fmt_string"],
):
if var_ is not None:
curr_func = name_
if fmt_string is not None:
if fmt_string.count("{}") != 1:
raise ValueError(
"The `fmt_string` must contain exactly one pair of curly braces."
)
if name_all is not None:
xc_col = df["xcat"].astype(str)
xc_col = xc_col.replace({cat: funcs[curr_func](cat) for cat in select_xcats})
df["xcat"] = pd.Categorical(xc_col, categories=list(set(xc_col)))
else:
df["xcat"] = df["xcat"].cat.rename_categories(
{cat: funcs[curr_func](cat) for cat in select_xcats}
)
return df
[docs]def create_empty_categorical_qdf(
cid: Optional[str] = None,
xcat: Optional[str] = None,
ticker: Optional[str] = None,
metrics: List[str] = ["value"],
date_range: Optional[pd.DatetimeIndex] = None,
start: Optional[str] = None,
end: Optional[str] = None,
categorical: bool = True,
) -> QuantamentalDataFrameBase:
"""
Create an empty QuantamentalDataFrame with categorical index columns. This is useful
for creating a DataFrame for a given ticker with the required metrics.
The ticker can be specified using `cid` and `xcat` or directly using `ticker`. The
data range can be specified using `date_range` or `start` and `end`.
Parameters
----------
cid : str, optional
`cid` value to use. Must be passed with `xcat`. Default is None.
xcat : str, optional
`xcat` value to use. Must be passed with `cid`. Default is None.
ticker : str, optional
Ticker to use. Must not be passed with `cid` and `xcat`. Default is None.
metrics : List[str], optional
List of metrics to create columns for. Default is ["value"].
date_range : pd.DatetimeIndex, optional
Date range to create the DataFrame for. Must not be passed with `start` and `end`.
Default is None.
start : str, optional
Start date for the DataFrame. Default is None.
end : str, optional
End date for the DataFrame. Default is None.
Raises
------
TypeError
If `metrics` is not a list of strings.
ValueError
If `date_range` is None and `start` and `end` are not provided.
ValueError
If `cid` and `xcat` are not provided together.
ValueError
If `cid` and `xcat` are provided together.
ValueError
If `ticker` is provided with `cid` and `xcat`.
Returns
-------
QuantamentalDataFrame
Empty DataFrame with the required index columns and metrics.
"""
if not all(isinstance(m, str) for m in metrics):
raise TypeError("`metrics` must be a list of strings.")
if (date_range is None) and (start is None or end is None):
raise ValueError(
"Either `date_range` or `start_date` & `end_date` must be specified."
)
if date_range is None:
date_range = pd.bdate_range(start=start, end=end)
if bool(cid) ^ bool(xcat):
raise ValueError("`cid` and `xcat` must be specified together.")
if not (bool(cid) ^ bool(ticker)):
raise ValueError("Either specify `cid` & `xcat` or `ticker` but not both.")
if ticker is not None:
cid, xcat = ticker.split("_", 1)
qdf = pd.DataFrame(columns=["real_date"], data=date_range)
qdf = _add_categorical_column(qdf, "cid", cid)
qdf = _add_categorical_column(qdf, "xcat", xcat)
for metric in metrics:
qdf[metric] = np.nan
if not categorical:
qdf = qdf_to_string_index(qdf)
return qdf
[docs]def add_nan_series(
df: QuantamentalDataFrameBase,
ticker: Optional[str] = None,
cid: Optional[str] = None,
xcat: Optional[str] = None,
start: Optional[Union[str, pd.Timestamp]] = None,
end: Optional[Union[str, pd.Timestamp]] = None,
) -> QuantamentalDataFrameBase:
"""
Add a NaN series to the DataFrame for a given ticker.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to add the NaN series to.
ticker : str, optional
Ticker to add the NaN series for. Must not be passed with `cid` and `xcat`.
Default is None.
cid : str, optional
`cid` value to use. Must be passed with `xcat`. Default is None.
xcat : str, optional
`xcat` value to use. Must be passed with `cid`. Default is None.
start : str or pd.Timestamp, optional
Start date for the NaN series. Default is None.
end : str or pd.Timestamp, optional
End date for the NaN series. Default is None.
Raises
------
TypeError
If `df` is not a QuantamentalDataFrame.
ValueError
If `ticker` is provided with `cid` and `xcat`.
Returns
-------
QuantamentalDataFrame
DataFrame with the NaN series added.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("`df` must be a QuantamentalDataFrame.")
metrics = df.columns.difference(QuantamentalDataFrameBase.IndexCols)
if start is None:
start = df["real_date"].min()
if end is None:
end = df["real_date"].max()
nan_df = create_empty_categorical_qdf(
cid=cid,
xcat=xcat,
ticker=ticker,
start=start,
end=end,
metrics=metrics,
categorical=check_is_categorical(df),
)
df = update_df(df=df, df_add=nan_df)
return df
[docs]def drop_nan_series(
df: QuantamentalDataFrameBase, column: str = "value", raise_warning: bool = False
) -> QuantamentalDataFrameBase:
"""
Drops any series that are entirely NaNs. Raises a user warning if any series are
dropped.
Parameters
----------
df : QuantamentalDataFrame
DataFrame to drop the NaN series from.
column : str, optional
Column to check for NaNs. Default is "value".
raise_warning : bool, optional
If True, raises a warning if any series are dropped. Default is False.
Raises
------
TypeError
If `df` is not a QuantamentalDataFrame.
ValueError
If `column` is not found in the DataFrame.
Returns
-------
QuantamentalDataFrame
DataFrame with the NaN series dropped.
"""
if not isinstance(df, QuantamentalDataFrameBase):
raise TypeError("Argument `df` must be a Quantamental DataFrame.")
if column not in df.columns:
raise ValueError(f"Column {column} not present in DataFrame.")
if not df[column].isna().any():
return df
if not isinstance(raise_warning, bool):
raise TypeError("Error: The raise_warning argument must be a boolean.")
df_orig: pd.DataFrame = df.copy()
for cd, xc in df_orig.groupby(["cid", "xcat"], observed=True).groups:
sel_series: pd.Series = df_orig[
(df_orig["cid"] == cd) & (df_orig["xcat"] == xc)
][column]
if sel_series.isna().all():
if raise_warning:
warnings.warn(
message=f"The series {cd}_{xc} is populated "
"with NaNs only, and will be dropped.",
category=UserWarning,
)
df = df[~((df["cid"] == cd) & (df["xcat"] == xc))]
return df.reset_index(drop=True)
[docs]def qdf_from_timeseries(
timeseries: pd.Series,
cid: Optional[str] = None,
xcat: Optional[str] = None,
ticker: Optional[str] = None,
metric: str = "value",
) -> QuantamentalDataFrameBase:
"""
Create a QuantamentalDataFrame from a time series.
Parameters
----------
timeseries : pd.Series
Time series to create the QuantamentalDataFrame from.
cid : str, optional
`cid` value to use. Must be passed with `xcat`. Default is None.
xcat : str, optional
`xcat` value to use. Must be passed with `cid`. Default is None.
ticker : str, optional
Ticker to use. Must not be passed with `cid` and `xcat`. Default is None.
metric : str, optional
Metric name to use. Default is "value".
Raises
------
TypeError
If `timeseries` is not a pandas Series.
TypeError
If `metric` is not a string.
ValueError
If `timeseries` does not have a datetime index.
ValueError
If only one of `cid` and `xcat` is provided.
ValueError
If `ticker` is provided with `cid` and `xcat`.
Returns
-------
QuantamentalDataFrame
DataFrame created from the time series.
"""
if not isinstance(timeseries, pd.Series):
raise TypeError("`timeseries` must be a pandas Series.")
if not isinstance(metric, str):
raise TypeError("`metric` must be a string.")
if not isinstance(timeseries.index, pd.DatetimeIndex):
raise ValueError("`timeseries` must have a datetime index.")
if (cid is None) ^ (xcat is None):
raise ValueError("Both `cid` and `xcat` must be provided.")
if not ((cid is None) ^ (ticker is None)):
raise ValueError("Either provide `cid` & `xcat` or `ticker`.")
if ticker is not None:
cid, xcat = ticker.split("_", 1)
assert bool(cid) and bool(xcat)
df = timeseries.reset_index().rename(columns={"index": "real_date", 0: metric})
# assign as categorical string
df = _add_categorical_column(df, "cid", cid)
df = _add_categorical_column(df, "xcat", xcat)
df = df[[*QuantamentalDataFrameBase.IndexCols, metric]]
return QuantamentalDataFrameBase(df)
def _convert_to_single_metric_qdfs(
qdf: QuantamentalDataFrameBase,
) -> QuantamentalDataFrameBase:
"""
Internal function for concat_qdfs. Splits a QuantamentalDataFrame with multiple
metrics to multiple QuantamentalDataFrames with a single metric.
Parameters
----------
qdf : QuantamentalDataFrame
DataFrame to convert.
Returns
-------
List[QuantamentalDataFrame]
List of QuantamentalDataFrames with a single metric.
"""
return [
qdf[[*QuantamentalDataFrameBase.IndexCols, metric]]
for metric in qdf.columns.difference(QuantamentalDataFrameBase.IndexCols)
]
[docs]def concat_qdfs(
qdf_list: List[QuantamentalDataFrameBase],
) -> QuantamentalDataFrameBase:
"""
Concatenate a list of QuantamentalDataFrames into a single QuantamentalDataFrame.
Converts the index columns to categorical format, if not already categorical.
Parameters
----------
qdf_list : List[QuantamentalDataFrame]
List of QuantamentalDataFrames to concatenate.
Raises
------
TypeError
If `qdf_list` is not a list of QuantamentalDataFrames.
Returns
-------
QuantamentalDataFrame
DataFrame with the QuantamentalDataFrames concatenated.
"""
if not isinstance(qdf_list, list):
raise TypeError("`qdfs_list` must be a list of QuantamentalDataFrames.")
if not all(isinstance(qdf, QuantamentalDataFrameBase) for qdf in qdf_list):
raise TypeError("All elements in `qdfs_list` must be QuantamentalDataFrames.")
if len(qdf_list) == 0:
raise ValueError("`qdfs_list` is empty.")
for iq, qdf in enumerate(qdf_list):
qdf_list[iq] = qdf_to_categorical(qdf)
comb_cids = pd.api.types.union_categoricals(
[qdf["cid"].unique() for qdf in qdf_list]
)
comb_xcats = pd.api.types.union_categoricals(
[qdf["xcat"].unique() for qdf in qdf_list]
)
for iq, qdf in enumerate(qdf_list):
qdf_list[iq]["cid"] = pd.Categorical(
qdf["cid"], categories=comb_cids.categories
)
qdf_list[iq]["xcat"] = pd.Categorical(
qdf["xcat"], categories=comb_xcats.categories
)
qdf_list = list(itertools.chain(*map(_convert_to_single_metric_qdfs, qdf_list)))
def _get_metric(df: QuantamentalDataFrameBase) -> str:
return list(set(df.columns) - set(QuantamentalDataFrameBase.IndexCols))[0]
def _group_by_metric(
dfl: List[QuantamentalDataFrameBase], fm: List[str]
) -> List[List[QuantamentalDataFrameBase]]:
r = [[] for _ in range(len(fm))]
while dfl:
metric = _get_metric(df=dfl[0])
r[fm.index(metric)] += [dfl.pop(0)]
return r
found_metrics = list(set(map(_get_metric, qdf_list)))
qdf_list = _group_by_metric(dfl=qdf_list, fm=found_metrics)
df: pd.DataFrame = functools.reduce(
lambda left, right: pd.merge(
left,
right,
on=QuantamentalDataFrameBase.IndexCols,
how="outer",
),
map(
lambda x: pd.concat(qdf_list.pop(0), axis=0, ignore_index=False),
found_metrics,
),
)
return df.sort_values(by=QuantamentalDataFrameBase.IndexColsSortOrder).reset_index(
drop=True
)[get_col_sort_order(df)]