from typing import List, Optional, Union
from pathlib import Path
import logging
import pandas as pd
from .constants import JPMAQS_DATASET_THEME_MAPPING
from .common import (
pd_to_datetime_compat,
_normalize_last_updated_cutoff,
_normalize_file_timestamp_cutoff,
_is_date_only_string,
_covering_large_delta_timestamp,
)
logger = logging.getLogger(__name__)
[docs]class FileSelector:
"""Helper class to reconcile API vs local file inventories."""
def __init__(
self,
api_files_df: Optional[pd.DataFrame],
local_files_df: Optional[pd.DataFrame],
file_name_col: str = "file-name",
tickers: Optional[List[str]] = None,
catalog_file: Optional[Union[str, Path]] = None,
case_sensitive: bool = False,
) -> None:
self.file_name_col = str(file_name_col)
self.api_files_df = (
api_files_df.copy()
if isinstance(api_files_df, pd.DataFrame)
else pd.DataFrame()
)
self.local_files_df = (
local_files_df.copy()
if isinstance(local_files_df, pd.DataFrame)
else pd.DataFrame()
)
self.api_files_df = self._normalize_file_name_col(
self.api_files_df, "api_files_df"
)
self.local_files_df = self._normalize_file_name_col(
self.local_files_df, "local_files_df"
)
self.tickers = (
[t.strip() for t in tickers if isinstance(t, str) and t.strip()]
if tickers
else []
)
self.catalog_file = Path(catalog_file) if catalog_file else None
self.case_sensitive = bool(case_sensitive)
self.datasets_for_tickers: List[str] = self._resolve_datasets_for_tickers()
self._dedupe_inventories()
self.files_df = self.api_files_df.merge(
self.local_files_df,
on=self.file_name_col,
how="outer",
suffixes=("_api", "_local"),
)
# Backwards compatible alias (internal / not user-facing).
self.merged_df = self.files_df
def _normalize_file_name_col(
self, df: pd.DataFrame, label: str
) -> pd.DataFrame:
"""Ensure *df* has a ``self.file_name_col`` column, falling back to ``filename``."""
if self.file_name_col not in df.columns:
if "filename" in df.columns:
df[self.file_name_col] = df["filename"]
elif not df.empty:
raise ValueError(f"Missing `{self.file_name_col}` in {label}.")
else:
df[self.file_name_col] = pd.Series(dtype="object")
return df
[docs] def refresh(
self,
*,
api_files_df: Optional[pd.DataFrame] = None,
local_files_df: Optional[pd.DataFrame] = None,
) -> None:
"""
Refresh cached API and/or local inventories in-place.
This is intended for reusing a single `FileSelector` instance across multiple
selection operations (for example when the client downloads files and the local
inventory changes).
"""
if api_files_df is not None:
self.api_files_df = (
api_files_df.copy()
if isinstance(api_files_df, pd.DataFrame)
else pd.DataFrame()
)
if local_files_df is not None:
self.local_files_df = (
local_files_df.copy()
if isinstance(local_files_df, pd.DataFrame)
else pd.DataFrame()
)
self.api_files_df = self._normalize_file_name_col(
self.api_files_df, "api_files_df"
)
self.local_files_df = self._normalize_file_name_col(
self.local_files_df, "local_files_df"
)
# Prefer enriching local inventory with API `last-modified` without forcing an
# upstream API call in `list_downloaded_files()`.
if (
(not self.local_files_df.empty)
and ("last-modified" not in self.local_files_df.columns)
and ("last-modified" in self.api_files_df.columns)
):
lm = self.api_files_df[[self.file_name_col, "last-modified"]].copy()
lm = lm.drop_duplicates(subset=[self.file_name_col], keep="first")
self.local_files_df = self.local_files_df.merge(
lm, on=self.file_name_col, how="left"
)
if self.tickers:
self.datasets_for_tickers = self._resolve_datasets_for_tickers()
self._dedupe_inventories()
self.files_df = self.api_files_df.merge(
self.local_files_df,
on=self.file_name_col,
how="outer",
suffixes=("_api", "_local"),
)
self.merged_df = self.files_df
[docs] def effective_snapshot_switchover_ts(
self,
*,
file_group_ids: List[str],
catalog_file_group_id: Optional[str] = None,
) -> Optional[pd.Timestamp]:
"""
Return the effective (per-request) earliest full-snapshot timestamp.
Notes
-----
JPMaQS can remove older full snapshots over time. For a given set of datasets
we define the "switchover" as the *latest* of the datasets' earliest currently
available full snapshots. If any dataset has no full snapshots at all, returns
None.
"""
if not file_group_ids:
return None
base_datasets = sorted(
{
str(d).replace("_DELTA", "")
for d in file_group_ids
if isinstance(d, str) and d and ("_METADATA" not in d.upper())
}
)
if catalog_file_group_id is not None:
base_datasets = [d for d in base_datasets if d != catalog_file_group_id]
if not base_datasets:
return None
api_like = self._as_local_like_df(self.api_files_df, source="api")
if api_like.empty:
return None
filenames = api_like["filename"].astype(str)
is_snapshot = ~filenames.str.contains(
"_DELTA", case=False, na=False
) & ~filenames.str.contains("_METADATA", case=False, na=False)
earliest_by_dataset: List[pd.Timestamp] = []
for ds in base_datasets:
ds_snapshots = api_like.loc[
is_snapshot & api_like["dataset"].astype(str).eq(ds)
]
if ds_snapshots.empty:
return None
earliest_by_dataset.append(ds_snapshots["file-timestamp"].min())
return max(earliest_by_dataset) if earliest_by_dataset else None
def _dedupe_inventories(self) -> None:
for attr in ("api_files_df", "local_files_df"):
df = getattr(self, attr)
if df.empty:
continue
if "last-modified" in df.columns:
df = df.sort_values("last-modified", ascending=False)
df = df.drop_duplicates(subset=[self.file_name_col], keep="first")
setattr(self, attr, df.reset_index(drop=True))
def _resolve_datasets_for_tickers(self) -> List[str]:
if not self.tickers:
return []
catalog_path: Optional[Path] = None
if self.catalog_file and self.catalog_file.is_file():
catalog_path = self.catalog_file
elif not self.local_files_df.empty and ("path" in self.local_files_df.columns):
df = self.local_files_df.copy()
if "dataset" in df.columns:
df = df[df["dataset"] == "JPMAQS_METADATA_CATALOG"]
else:
df = df[
df[self.file_name_col]
.astype(str)
.str.startswith("JPMAQS_METADATA_CATALOG_")
]
df = df[df["path"].notna()]
if (not df.empty) and ("file-timestamp" in df.columns):
df = df.sort_values("file-timestamp", ascending=False)
if not df.empty:
try:
candidate = Path(str(df.iloc[0]["path"]))
if candidate.is_file():
catalog_path = candidate
except Exception:
catalog_path = None
if catalog_path is None:
return []
try:
cat = pd.read_parquet(catalog_path)
except Exception:
return []
ticker_col = "Ticker" if "Ticker" in cat.columns else "ticker"
theme_col = "Theme" if "Theme" in cat.columns else "theme"
if ticker_col not in cat.columns or theme_col not in cat.columns:
return []
if self.case_sensitive:
mask = cat[ticker_col].astype(str).isin(self.tickers)
else:
req = {t.lower() for t in self.tickers}
mask = cat[ticker_col].astype(str).str.lower().isin(req)
ds = (
cat.loc[mask, theme_col]
.map(JPMAQS_DATASET_THEME_MAPPING)
.dropna()
.astype(str)
.unique()
.tolist()
)
return sorted(set(ds))
def _as_local_like_df(self, df: pd.DataFrame, *, source: str) -> pd.DataFrame:
if df.empty:
return df.copy()
out = df.copy()
if "filename" not in out.columns:
out["filename"] = out[self.file_name_col].astype(str)
if "dataset" not in out.columns:
base = out["filename"].astype(str).str.split(".", n=1).str[0]
out["dataset"] = base.str.rsplit("_", n=1).str[0]
if "e-dataset" not in out.columns:
out["e-dataset"] = (
out["dataset"].astype(str).str.replace(r"_DELTA$", "", regex=True)
)
if "file-timestamp" not in out.columns:
if source == "api" and "file-datetime" in out.columns:
out["file-timestamp"] = pd_to_datetime_compat(
out["file-datetime"], utc=True
)
else:
base = out["filename"].astype(str).str.split(".", n=1).str[0]
ts_str = base.str.rsplit("_", n=1).str[-1]
out["file-timestamp"] = pd_to_datetime_compat(ts_str, utc=True)
out = out[out["file-timestamp"].notna()].copy()
if self.datasets_for_tickers:
out = out[out["e-dataset"].isin(self.datasets_for_tickers)].copy()
return out
def _most_recent_local_catalog(
self,
*,
to_datetime: Optional[Union[str, pd.Timestamp]] = None,
catalog_file_group_id: str = "JPMAQS_METADATA_CATALOG",
) -> Optional[str]:
if self.local_files_df.empty:
return None
if "path" not in self.local_files_df.columns:
return None
local_df = self.local_files_df.copy()
local_df = local_df[local_df["path"].notna()].copy()
if local_df.empty:
return None
local_like = self._as_local_like_df(local_df, source="local")
if local_like.empty:
return None
local_catalogs = local_like[
local_like["dataset"].astype(str).eq(str(catalog_file_group_id))
& local_like["filename"].astype(str).str.lower().str.endswith(".parquet")
].copy()
if local_catalogs.empty:
return None
if to_datetime is not None:
cutoff = _normalize_file_timestamp_cutoff(to_datetime)
local_catalogs = local_catalogs[
local_catalogs["file-timestamp"] <= cutoff
].copy()
if local_catalogs.empty:
return None
local_catalogs = local_catalogs.sort_values(
by=["file-timestamp", "filename"], ascending=False
)
return str(local_catalogs.iloc[0]["path"])
[docs] def select_files_for_download(
self,
overwrite: bool = False,
since_datetime: Optional[Union[str, pd.Timestamp]] = None,
to_datetime: Optional[Union[str, pd.Timestamp]] = None,
file_group_ids: Optional[List[str]] = None,
include_full_snapshots: bool = True,
include_delta_files: bool = True,
include_metadata_files: bool = False,
warn_if_no_full_snapshots: bool = False,
last_modified_col: str = "last-modified",
min_last_updated: Optional[Union[str, pd.Timestamp]] = None,
max_last_updated: Optional[Union[str, pd.Timestamp]] = None,
) -> List[str]:
"""Select API file-name(s) required for a load vintage that are missing/outdated locally."""
api_like = self._as_local_like_df(self.api_files_df, source="api")
if api_like.empty:
return []
if file_group_ids is not None:
if (not isinstance(file_group_ids, list)) or (
not all(isinstance(x, str) for x in file_group_ids)
):
raise ValueError("`file_group_ids` must be a list of strings.")
if "file-group-id" in api_like.columns:
api_like = api_like[
api_like["file-group-id"].isin(file_group_ids)
].copy()
if not include_full_snapshots:
is_snapshot = ~api_like["filename"].astype(str).str.contains(
"_DELTA", case=False, na=False
) & ~api_like["filename"].astype(str).str.contains(
"_METADATA", case=False, na=False
)
api_like = api_like.loc[~is_snapshot].copy()
selected_api = _select_local_files_for_load(
api_like,
since_datetime=since_datetime,
to_datetime=to_datetime,
include_delta_files=include_delta_files,
warn_if_no_full_snapshots=warn_if_no_full_snapshots,
min_last_updated=min_last_updated,
max_last_updated=max_last_updated,
)
required = set(selected_api["filename"].astype(str).tolist())
# Metadata files are also vintage-sensitive (the date window matters). When
# requested, include only metadata files that fall within the same vintage
# window semantics as snapshots/deltas (date-only strings are treated as
# whole-day cutoffs).
if include_metadata_files:
meta_mask = (
api_like["filename"]
.astype(str)
.str.contains("_METADATA", case=False, na=False)
)
meta_df = api_like.loc[meta_mask].copy()
if not meta_df.empty:
meta_since_ts = (
pd_to_datetime_compat(since_datetime)
if since_datetime is not None
else None
)
if meta_since_ts is not None and _is_date_only_string(since_datetime):
meta_since_ts = meta_since_ts.normalize()
meta_to_ts = (
_normalize_file_timestamp_cutoff(to_datetime)
if to_datetime is not None
else meta_df["file-timestamp"].max()
)
if meta_since_ts is not None and meta_since_ts > meta_to_ts:
meta_since_ts, meta_to_ts = meta_to_ts, meta_since_ts
if meta_since_ts is not None:
meta_df = meta_df[
meta_df["file-timestamp"].between(meta_since_ts, meta_to_ts)
].copy()
else:
meta_df = meta_df[meta_df["file-timestamp"].le(meta_to_ts)].copy()
required |= set(meta_df["filename"].astype(str).tolist())
if not required:
return []
if overwrite:
return sorted(required)
local = self.local_files_df.copy()
if (not local.empty) and ("path" in local.columns):
local = local[
local["path"].notna() & local["path"].astype(str).str.len().gt(0)
]
local = local[local["path"].apply(lambda p: Path(str(p)).is_file())]
present = (
set(local[self.file_name_col].astype(str).tolist())
if not local.empty
else set()
)
to_download = set(required - present)
if (
(last_modified_col in self.api_files_df.columns)
and (not self.local_files_df.empty)
and (last_modified_col in self.local_files_df.columns)
):
adf = self.api_files_df.set_index(self.file_name_col)[last_modified_col]
ldf = self.local_files_df.set_index(self.file_name_col)[last_modified_col]
common = adf.index.intersection(ldf.index).intersection(list(required))
if len(common) > 0:
updated = common[adf.loc[common] > ldf.loc[common]]
to_download |= set(map(str, updated.tolist()))
return sorted(to_download)
[docs] def select_files_for_load(
self,
since_datetime: Optional[Union[str, pd.Timestamp]] = None,
to_datetime: Optional[Union[str, pd.Timestamp]] = None,
include_delta_files: bool = True,
warn_if_no_full_snapshots: bool = False,
min_last_updated: Optional[Union[str, pd.Timestamp]] = None,
max_last_updated: Optional[Union[str, pd.Timestamp]] = None,
) -> pd.DataFrame:
"""Select local snapshot/delta files to load from disk (drops rows without a valid file `path`)."""
if self.local_files_df.empty:
return self.local_files_df.copy()
if "path" not in self.local_files_df.columns:
return pd.DataFrame()
local_df = self.local_files_df.copy()
local_df = local_df[
local_df["path"].notna() & local_df["path"].astype(str).str.len().gt(0)
]
local_df = local_df[local_df["path"].apply(lambda p: Path(str(p)).is_file())]
local_df = self._as_local_like_df(local_df, source="local")
if local_df.empty:
return local_df
return _select_local_files_for_load(
local_df,
since_datetime=since_datetime,
to_datetime=to_datetime,
include_delta_files=include_delta_files,
warn_if_no_full_snapshots=warn_if_no_full_snapshots,
min_last_updated=min_last_updated,
max_last_updated=max_last_updated,
)
[docs] def oldest_api_file_timestamp(self) -> Optional[pd.Timestamp]:
"""
Return the oldest file timestamp present in the API inventory (UTC).
Assumes `api_files_df` is an unfiltered API inventory (full history), as provided
by `DataQueryFileAPIClient.list_available_files_for_all_file_groups()`.
"""
api_like = self._as_local_like_df(self.api_files_df, source="api")
if api_like.empty:
return None
ts = api_like["file-timestamp"].min()
if pd.isna(ts):
return None
return ts
def _select_local_files_for_load(
files_df: pd.DataFrame,
*,
since_datetime: Optional[Union[str, pd.Timestamp]] = None,
to_datetime: Optional[Union[str, pd.Timestamp]] = None,
include_delta_files: bool = True,
warn_if_no_full_snapshots: bool = False,
min_last_updated: Optional[Union[str, pd.Timestamp]] = None,
max_last_updated: Optional[Union[str, pd.Timestamp]] = None,
) -> pd.DataFrame:
"""
Single-responsibility helper: choose which local snapshot/delta files to load.
The selection is per effective dataset ("e-dataset"):
- If full snapshots exist for the dataset and at least one snapshot is present in the
requested file-vintage window, load the latest snapshot in the window and any delta
files newer than that snapshot (also within the window).
- If no full snapshots exist at or before the requested vintage (effective delta-only
history), load *all* available deltas up to the requested vintage. For monthly "large
delta" regimes, also include the covering month-end delta file even if it timestamps
after `to_datetime` (row-level filtering is handled via `max_last_updated`).
"""
if files_df.empty:
return files_df
df = files_df.copy()
if "file-timestamp" not in df.columns:
raise ValueError("Expected column 'file-timestamp' in files_df")
if "filename" not in df.columns:
raise ValueError("Expected column 'filename' in files_df")
if "e-dataset" in df.columns:
group_col = "e-dataset"
else:
group_col = "dataset"
if group_col not in df.columns:
raise ValueError("Expected column 'dataset' in files_df")
df[group_col] = (
df[group_col].astype(str).str.replace(r"_DELTA$", "", regex=True)
)
base_columns = list(df.columns)
window_start_ts: Optional[pd.Timestamp] = (
pd_to_datetime_compat(since_datetime) if since_datetime is not None else None
)
if window_start_ts is not None and _is_date_only_string(since_datetime):
window_start_ts = window_start_ts.normalize()
file_vintage_ts: pd.Timestamp = (
_normalize_file_timestamp_cutoff(to_datetime)
if to_datetime is not None
else df["file-timestamp"].max()
)
# Row-content vintage cutoff. This is used only to decide if we must include a
# covering month-end "large delta" file even when its file timestamp lies after
# `to_datetime` (file_vintage_ts).
content_vintage_ts: Optional[pd.Timestamp] = None
if (to_datetime is not None) or (max_last_updated is not None):
content_vintage_ts = _normalize_last_updated_cutoff(
max_last_updated if max_last_updated is not None else to_datetime
)
window_end_ts: pd.Timestamp = file_vintage_ts
if window_start_ts is not None and window_start_ts > window_end_ts:
window_start_ts, window_end_ts = window_end_ts, window_start_ts
filenames = df["filename"].astype(str)
is_delta = filenames.str.contains("_DELTA", case=False, na=False)
is_metadata = filenames.str.contains("_METADATA", case=False, na=False)
is_snapshot = ~is_delta & ~is_metadata
file_ts = df["file-timestamp"]
in_window = (
file_ts.le(window_end_ts)
if window_start_ts is None
else file_ts.between(window_start_ts, window_end_ts)
)
# Dataset-level properties
df["has_snapshot_upto_vintage"] = (
(is_snapshot & file_ts.le(file_vintage_ts))
.groupby(df[group_col])
.transform("any")
.astype(bool)
)
df["is_delta_only_history"] = ~df["has_snapshot_upto_vintage"]
df["latest_snapshot_in_window"] = (
file_ts.where(is_snapshot & in_window).groupby(df[group_col]).transform("max")
)
df["has_snapshot_in_window"] = df["latest_snapshot_in_window"].notna()
need_cover = (to_datetime is not None) or (max_last_updated is not None)
cover_basis_ts = (
content_vintage_ts if content_vintage_ts is not None else file_vintage_ts
)
# Keep tz-aware dtype even when the column is all-NaT to avoid tz comparison errors.
df["cover_ts"] = pd.to_datetime(
pd.Series(pd.NaT, index=df.index), utc=True, errors="coerce"
)
df["has_regular_in_month"] = False
if need_cover and bool(is_delta.any()):
delta_ts = df.loc[is_delta, "file-timestamp"]
delta_groups = df.loc[is_delta, group_col]
cover_ts_by_group = delta_ts.groupby(delta_groups).apply(
lambda s: _covering_large_delta_timestamp(
to_ts=cover_basis_ts, delta_file_timestamps=s.tolist()
)
)
df["cover_ts"] = pd.to_datetime(
df[group_col].map(cover_ts_by_group), utc=True, errors="coerce"
)
def _has_regular_in_month(file_timestamps: pd.Series) -> bool:
in_month = (file_timestamps.dt.year == cover_basis_ts.year) & (
file_timestamps.dt.month == cover_basis_ts.month
)
is_large_delta_ts = (
(file_timestamps.dt.hour == 23)
& (file_timestamps.dt.minute == 59)
& (file_timestamps.dt.second == 59)
)
return bool(
(
in_month & file_timestamps.le(cover_basis_ts) & (~is_large_delta_ts)
).any()
)
regular_by_group = delta_ts.groupby(delta_groups).apply(_has_regular_in_month)
df["has_regular_in_month"] = (
df[group_col]
.map(regular_by_group)
.astype("boolean")
.fillna(False)
.astype(bool)
)
cover_ts = df["cover_ts"]
latest_snapshot_ts = df["latest_snapshot_in_window"]
delta_cutoff_ts = file_vintage_ts
if content_vintage_ts is not None and max_last_updated is not None:
delta_cutoff_ts = max(delta_cutoff_ts, content_vintage_ts)
selected = pd.Series(False, index=df.index)
if include_delta_files:
selected |= df["is_delta_only_history"] & is_delta & file_ts.le(delta_cutoff_ts)
if need_cover:
selected |= (
df["is_delta_only_history"]
& is_delta
& cover_ts.notna()
& file_ts.eq(cover_ts)
& file_ts.gt(delta_cutoff_ts)
)
selected |= (
is_snapshot
& in_window
& df["has_snapshot_in_window"]
& file_ts.eq(latest_snapshot_ts)
)
if include_delta_files:
selected |= (
is_delta
& in_window
& df["has_snapshot_upto_vintage"]
& (~df["has_snapshot_in_window"])
)
selected |= (
is_delta
& in_window
& df["has_snapshot_in_window"]
& file_ts.ge(latest_snapshot_ts)
)
if need_cover:
selected |= (
is_delta
& df["has_snapshot_in_window"]
& cover_ts.notna()
& (~df["has_regular_in_month"])
& cover_ts.ge(latest_snapshot_ts)
& file_ts.eq(cover_ts)
)
out = df.loc[selected].copy()
if out.empty:
return df.iloc[0:0].copy()[base_columns]
earliest_snapshot_ts: Optional[pd.Timestamp] = None
if warn_if_no_full_snapshots and window_start_ts is not None:
earliest_snapshot_ts = df.loc[is_snapshot, "file-timestamp"].min()
if pd.isna(earliest_snapshot_ts):
earliest_snapshot_ts = None
if warn_if_no_full_snapshots and window_start_ts is not None:
is_delta_out = (
out["filename"].astype(str).str.contains("_DELTA", case=False, na=False)
)
snapshots_out = out.loc[~is_delta_out].copy()
if (
snapshots_out.empty
and bool(is_delta_out.any())
and (earliest_snapshot_ts is not None)
):
earliest_snapshot_str = earliest_snapshot_ts.strftime("%Y-%m-%dT%H:%M:%SZ")
logger.warning(
"No full snapshots available in the requested window "
f"since={window_start_ts.strftime('%Y-%m-%dT%H:%M:%SZ')} "
f"to={window_end_ts.strftime('%Y-%m-%dT%H:%M:%SZ')} "
f"earliest_snapshot={earliest_snapshot_str}"
)
if not include_delta_files:
is_delta = (
out["filename"].astype(str).str.contains("_DELTA", case=False, na=False)
)
is_metadata = (
out["filename"].astype(str).str.contains("_METADATA", case=False, na=False)
)
out = out.loc[~is_delta & ~is_metadata].copy()
out = out.sort_values([group_col, "file-timestamp", "filename"]).reset_index(
drop=True
)
return out[base_columns]