import logging
from typing import Optional, Dict, Tuple, Union
import numpy as np
import pandas as pd
from macrosynergy.management.types import QuantamentalDataFrame
from macrosynergy.management.utils.df_utils import _long_to_wide, _wide_to_long
from macrosynergy.securities.validate import (
_validate_frequency,
_validate_constituents,
_validate_returns,
_validate_index_returns,
)
logger = logging.getLogger(__name__)
def _resolve_reconstitution_freq(
rebalance_freq: str,
reconstitution_freq: Optional[str],
) -> str:
"""
Return the effective reconstitution frequency, defaulting to the rebalance frequency.
Parameters
----------
rebalance_freq : str
Base rebalancing frequency used when "reconstitution_freq" is None.
reconstitution_freq : str or None
Explicit reconstitution frequency, or None to inherit from
"rebalance_freq".
Returns
-------
str
"reconstitution_freq" if not None; otherwise "rebalance_freq".
"""
return reconstitution_freq if reconstitution_freq is not None else rebalance_freq
def _assign_period_labels(dates: pd.DatetimeIndex, freq: str) -> pd.PeriodIndex:
"""
Convert a DatetimeIndex to a PeriodIndex at the given frequency.
Parameters
----------
dates : pd.DatetimeIndex
Dates to label.
freq : str
Pandas period frequency alias, e.g. "M" for month-end periods.
Returns
-------
pd.PeriodIndex
Period labels corresponding to each date in ``dates``.
"""
return dates.to_period(freq)
def _build_reconstitution_membership(
membership_wide: pd.DataFrame,
recon_freq: str,
) -> pd.DataFrame:
"""
Snap membership to the first trading day of each reconstitution period.
For each period defined by ``recon_freq``, the membership values recorded on
the period's first day are broadcast forward across every day in that period,
so the effective composition is held constant within a period.
Parameters
----------
membership_wide : pd.DataFrame
Wide-format binary membership matrix with a DatetimeIndex (business days)
and one column per security (cid).
recon_freq : str
Pandas period frequency alias defining the reconstitution cadence,
e.g. "M" for monthly snapshots.
Returns
-------
pd.DataFrame
Wide-format DataFrame with the same shape as ``membership_wide``, where
each cell reflects the membership recorded on the first day of its period.
"""
periods = _assign_period_labels(membership_wide.index, recon_freq)
first_day_idx = (
pd.Series(membership_wide.index, index=periods).groupby(level=0).first()
)
first_day_mem = membership_wide.loc[first_day_idx.values]
first_day_mem.index = first_day_idx.index
result = membership_wide.copy()
result.values[:] = first_day_mem.loc[periods].values
return result
def _apply_er_formula(
stock_returns: pd.DataFrame,
bench_returns: pd.Series,
method: str,
) -> pd.DataFrame:
"""
Apply the chosen excess-return formula element-wise to stock and benchmark returns.
Parameters
----------
stock_returns : pd.DataFrame
Wide-format returns for individual stocks (rows = dates, columns = cids),
expressed as decimal fractions (not percentage points).
bench_returns : pd.Series
Benchmark return series aligned to the same date index as ``stock_returns``,
expressed as decimal fractions.
method : str
Excess-return formula: "ratio", "log", or "diff".
Returns
-------
pd.DataFrame
Wide-format excess returns with the same shape as ``stock_returns``,
expressed as decimal fractions.
"""
if method == "ratio":
return stock_returns.add(1).div(bench_returns + 1, axis=0) - 1
elif method == "log":
return np.log1p(stock_returns).sub(np.log1p(bench_returns), axis=0)
elif method == "diff":
return stock_returns.sub(bench_returns, axis=0)
[docs]def compute_daily_weights(
constituents: pd.DataFrame,
returns: pd.DataFrame,
rebalance_freq: str = "M",
reconstitution_freq: Optional[str] = None,
blacklist: Optional[Dict[str, Tuple[pd.Timestamp, pd.Timestamp]]] = None,
) -> pd.DataFrame:
"""
Compute daily float-adjusted equal weights for an index constituent set.
Starting from an equal-weighted portfolio at the beginning of each rebalancing
period, weights drift with realized returns within the period. At each rebalance
date the portfolio is reset to equal weights over the current constituent set.
Reconstitution (membership changes) can be snapped to a coarser frequency than
rebalancing via ``reconstitution_freq``.
Parameters
----------
constituents : pd.DataFrame or QuantamentalDataFrame
Long-format DataFrame with columns "cid", "real_date", and
"membership" (binary 0/1). Each row records whether a security was a
constituent on a given date.
returns : pd.DataFrame or QuantamentalDataFrame
Long-format DataFrame with columns "cid", "real_date", "xcat",
and "value" (daily return in percentage points). Must be filtered to a
single xcat before passing.
rebalance_freq : str, default "M"
Pandas period alias controlling how often the portfolio is reset to equal
weights. Must be one of {"B", "W", "M", "Q", "Y"}.
reconstitution_freq : str or None, default None
Pandas period alias controlling how often membership changes take effect.
If None, defaults to "rebalance_freq".
blacklist : dict or None, default None
Mapping of "cid" to (start, end) pd.Timestamp pairs
identifying securities to exclude. Exclusions are snapped to rebalance
period starts, matching the weight-reset cadence.
Returns
-------
pd.DataFrame
Long-format DataFrame with columns ["real_date", "cid", "value"]
containing the daily portfolio weight for each constituent, with
zero-weight rows dropped.
"""
_validate_frequency(rebalance_freq, "rebalance_freq")
if reconstitution_freq is not None:
_validate_frequency(reconstitution_freq, "reconstitution_freq")
_validate_constituents(constituents)
_validate_returns(returns)
recon_freq = _resolve_reconstitution_freq(rebalance_freq, reconstitution_freq)
# Pivot to wide
constituents["real_date"] = pd.to_datetime(constituents["real_date"])
mem_wide = (
_long_to_wide(constituents[["cid", "real_date", "membership"]], "membership")
.fillna(0)
.astype(int)
)
returns["real_date"] = pd.to_datetime(returns["real_date"])
ret_wide = _long_to_wide(returns[["cid", "real_date", "value"]], "value")
# Align columns
common_cids = mem_wide.columns.intersection(ret_wide.columns)
assert (
len(common_cids) > 0
), "No common cids between constituents and returns DataFrames."
mem_wide = mem_wide[common_cids]
ret_wide = ret_wide[common_cids]
# Reindex both to a common complete business-day calendar, then align
all_dates = mem_wide.index.union(ret_wide.index).sort_values()
full_bdays = pd.bdate_range(all_dates.min(), all_dates.max(), freq="B")
mem_wide = mem_wide.reindex(full_bdays).ffill().fillna(0).astype(int)
ret_wide = ret_wide.reindex(full_bdays).fillna(0.0) / 100.0 # pct -> decimal
# Apply reconstitution: snapshot at period start, hold through period
mem_effective = _build_reconstitution_membership(mem_wide, recon_freq)
# Apply blacklist: zero out blacklisted securities on each rebalance date.
# Snapshot blacklist state at the first day of each rebalancing period and
# hold through the period, so a blacklisted security stays excluded until
# the next rebalance where it is no longer blacklisted.
if blacklist:
rebal_periods_pre = _assign_period_labels(full_bdays, rebalance_freq)
active_periods_before = (
mem_effective.groupby(rebal_periods_pre)
.first()
.gt(0)
.sum()
.rename("active_periods_before")
)
bl_mask = pd.DataFrame(False, index=full_bdays, columns=mem_effective.columns)
for cid, (start, end) in blacklist.items():
if cid in bl_mask.columns:
bl_mask.loc[(bl_mask.index >= start) & (bl_mask.index <= end), cid] = (
True
)
else:
logger.info(
"Blacklist cid '%s' not found in constituent universe — skipped.",
cid,
)
bl_effective = _build_reconstitution_membership(
bl_mask.astype(int), rebalance_freq
).astype(bool)
mem_effective = mem_effective.where(~bl_effective, 0)
active_periods_after = (
mem_effective.groupby(rebal_periods_pre)
.first()
.gt(0)
.sum()
.rename("active_periods_after")
)
summary = pd.concat([active_periods_before, active_periods_after], axis=1)
summary["periods_removed"] = (
summary["active_periods_before"] - summary["active_periods_after"]
)
affected = summary[summary["periods_removed"] > 0]
if affected.empty:
logger.info("Blacklist applied but no rebalancing periods were affected.")
else:
logger.warning(
"Blacklist reduced active rebalancing periods for %d security(ies):\n%s",
len(affected),
affected.to_string(),
)
# Assign rebalancing periods
rebal_periods = _assign_period_labels(full_bdays, rebalance_freq)
# Vectorized weight drift using cumprod within each rebalancing period.
#
# On rebalancing day 1, weight_i = (1/N) * membership_i.
# On day d within the period, the unnormalized weight is:
# w_i(d) = w_i(0) * prod_{t=0}^{d-1}(1 + r_i(t))
#
# We shift the cumulative product so that day 0 uses the initial weight
# (cumprod hasn't started yet) and day d reflects returns through day d-1.
# Then we normalize row-wise so weights sum to 1.
# Initial equal weights per period: 1/N for members, 0 for non-members
n_members = mem_effective.groupby(rebal_periods).transform("first").sum(axis=1)
initial_w = mem_effective.div(n_members.replace(0, np.nan), axis=0).fillna(0.0)
# Cumulative growth factor within each period, shifted so day 0 = 1.0
growth = (1 + ret_wide).groupby(rebal_periods).cumprod()
growth_shifted = growth.groupby(rebal_periods).shift(1).fillna(1.0)
# Unnormalized drifted weights
weights_raw = initial_w * growth_shifted
# Normalize so each row sums to 1
row_sums = weights_raw.sum(axis=1).replace(0, np.nan)
weights = weights_raw.div(row_sums, axis=0).fillna(0.0)
# Convert to long, drop zero-weight rows
weights_long = _wide_to_long(weights, value_name="value")
weights_long = weights_long[weights_long["value"] > 0].reset_index(drop=True)
return weights_long
[docs]def compute_index_returns(
constituents: pd.DataFrame,
returns: pd.DataFrame,
rebalance_freq: str = "M",
reconstitution_freq: Optional[str] = None,
blacklist: Optional[Dict[str, Tuple[pd.Timestamp, pd.Timestamp]]] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Compute daily index-level returns from constituent weights and individual returns.
Wraps :func:`compute_daily_weights` to obtain daily float-adjusted weights, then
computes the weighted-average return across constituents for each business day.
Parameters
----------
constituents : pd.DataFrame or QuantamentalDataFrame
Long-format DataFrame with columns "cid", "real_date", and
"membership" (binary 0/1).
returns : pd.DataFrame or QuantamentalDataFrame
Long-format DataFrame with columns "cid", "real_date", "xcat",
and "value" (daily return in percentage points). Must be filtered to a
single xcat before passing.
rebalance_freq : str, default "M"
Portfolio rebalancing frequency. Must be one of {"B", "W", "M", "Q", "Y"}.
reconstitution_freq : str or None, default None
Membership reconstitution frequency. Defaults to "rebalance_freq" when
None.
blacklist : dict or None, default None
Mapping of "cid" to (start, end) pd.Timestamp pairs for
securities to exclude.
Returns
-------
daily_index : pd.DataFrame
DataFrame with columns ["real_date", "value"] containing the daily
index return in percentage points.
weights_long : pd.DataFrame
Long-format weight DataFrame as returned by "compute_daily_weights".
"""
weights_long = compute_daily_weights(
constituents, returns, rebalance_freq, reconstitution_freq, blacklist
)
# Pivot weights to wide for multiplication
w_wide = weights_long.pivot(
index="real_date", columns="cid", values="value"
).fillna(0.0)
# Align returns to same dates/cids
ret_wide = _long_to_wide(returns[["cid", "real_date", "value"]], "value")
full_bdays = w_wide.index
common_cids = w_wide.columns
ret_wide = (
ret_wide.reindex(index=full_bdays, columns=common_cids).fillna(0.0) / 100.0
)
# Daily index return = sum(w_i * r_i)
daily_ret = 100 * (w_wide * ret_wide).sum(axis=1)
daily_index = pd.DataFrame(
{
"real_date": full_bdays,
"value": daily_ret.values,
}
)
return daily_index, weights_long
[docs]def compute_excess_returns(
returns: pd.DataFrame,
index_returns: pd.DataFrame,
method: str = "ratio",
output_freq: Optional[str] = None,
) -> pd.DataFrame:
"""
Compute per-stock excess (active) returns relative to a benchmark index.
Three excess-return formulations are supported:
- "ratio" : (1 + r_stock) / (1 + r_bench) - 1
- "log" : log(1 + r_stock) - log(1 + r_bench)
- "diff" : r_stock - r_bench
Optionally compounds daily returns to a lower frequency before computing the
excess return.
Parameters
----------
returns : pd.DataFrame or QuantamentalDataFrame
Long-format DataFrame with columns "cid", "real_date", "xcat",
and "value" (daily return in percentage points).
index_returns : pd.DataFrame or QuantamentalDataFrame
DataFrame with columns "real_date" and "value" (daily index return
in percentage points). Must contain one row per date (no duplicates).
method : str, default "ratio"
Excess-return formula. One of {"ratio", "log", "diff"}.
output_freq : str or None, default None
If provided, daily returns are compounded to this frequency before the
excess-return formula is applied. Must be one of
{"B", "W", "M", "Q", "Y"}. When None, excess returns are
computed at daily frequency.
Returns
-------
pd.DataFrame
Long-format DataFrame with columns ["real_date", "cid", "value"]
containing excess returns in percentage points. Dates correspond to
period-end timestamps when "output_freq" is set.
"""
_validate_returns(returns)
_validate_index_returns(index_returns)
if output_freq is not None:
_validate_frequency(output_freq, "output_freq")
assert method in (
"ratio",
"log",
"diff",
), f"method must be 'ratio', 'log', or 'diff', got '{method}'."
# Pivot stock returns to wide, convert to decimal
ret_wide = _long_to_wide(returns[["cid", "real_date", "value"]], "value") / 100.0
index_returns = index_returns.copy(deep=True)
index_returns["value"] = index_returns["value"] / 100.0 # pct -> decimal
# Benchmark as Series
bench = index_returns.set_index("real_date")["value"]
# Align to common business-day index
all_dates = ret_wide.index.union(bench.index).sort_values()
full_bdays = pd.bdate_range(all_dates.min(), all_dates.max(), freq="B")
# Keep NaN for stocks on dates they have no return (preserves sparsity)
ret_wide = ret_wide.reindex(full_bdays)
bench = bench.reindex(full_bdays).fillna(0.0)
if output_freq is not None:
periods = _assign_period_labels(full_bdays, output_freq)
# Compound daily -> period; NaN days treated as no return (1+NaN -> NaN)
# Use skipna=False via manual approach: fill NaN with 0 for prod, but
# track which stock-periods have any data
has_data = ret_wide.notna().groupby(periods).any()
ret_filled = ret_wide.fillna(0.0)
stock_period = (1 + ret_filled).groupby(periods).prod() - 1
bench_period = (1 + bench).groupby(periods).prod() - 1
er_wide = _apply_er_formula(stock_period, bench_period, method)
er_wide = er_wide.where(has_data)
er_wide.index = er_wide.index.to_timestamp(how="end")
er_long = _wide_to_long(er_wide, value_name="value")
er_long["value"] = er_long["value"] * 100.0 # decimal -> pct
return er_long
else:
er_wide = _apply_er_formula(ret_wide, bench, method)
# NaN propagation: stocks with no return on a date stay NaN
er_long = _wide_to_long(er_wide, value_name="value")
er_long["value"] = er_long["value"] * 100.0 # decimal -> pct
return er_long