Source code for macrosynergy.download.dataquery_file_api.file_selector

from typing import List, Optional, Union
from pathlib import Path
import logging

import pandas as pd

from .constants import JPMAQS_DATASET_THEME_MAPPING

from .common import (
    pd_to_datetime_compat,
    _normalize_last_updated_cutoff,
    _normalize_file_timestamp_cutoff,
    _is_date_only_string,
    _covering_large_delta_timestamp,
)

logger = logging.getLogger(__name__)


[docs]class FileSelector: """Helper class to reconcile API vs local file inventories.""" def __init__( self, api_files_df: Optional[pd.DataFrame], local_files_df: Optional[pd.DataFrame], file_name_col: str = "file-name", tickers: Optional[List[str]] = None, catalog_file: Optional[Union[str, Path]] = None, case_sensitive: bool = False, ) -> None: self.file_name_col = str(file_name_col) self.api_files_df = ( api_files_df.copy() if isinstance(api_files_df, pd.DataFrame) else pd.DataFrame() ) self.local_files_df = ( local_files_df.copy() if isinstance(local_files_df, pd.DataFrame) else pd.DataFrame() ) self.api_files_df = self._normalize_file_name_col( self.api_files_df, "api_files_df" ) self.local_files_df = self._normalize_file_name_col( self.local_files_df, "local_files_df" ) self.tickers = ( [t.strip() for t in tickers if isinstance(t, str) and t.strip()] if tickers else [] ) self.catalog_file = Path(catalog_file) if catalog_file else None self.case_sensitive = bool(case_sensitive) self.datasets_for_tickers: List[str] = self._resolve_datasets_for_tickers() self._dedupe_inventories() self.files_df = self.api_files_df.merge( self.local_files_df, on=self.file_name_col, how="outer", suffixes=("_api", "_local"), ) # Backwards compatible alias (internal / not user-facing). self.merged_df = self.files_df def _normalize_file_name_col( self, df: pd.DataFrame, label: str ) -> pd.DataFrame: """Ensure *df* has a ``self.file_name_col`` column, falling back to ``filename``.""" if self.file_name_col not in df.columns: if "filename" in df.columns: df[self.file_name_col] = df["filename"] elif not df.empty: raise ValueError(f"Missing `{self.file_name_col}` in {label}.") else: df[self.file_name_col] = pd.Series(dtype="object") return df
[docs] def refresh( self, *, api_files_df: Optional[pd.DataFrame] = None, local_files_df: Optional[pd.DataFrame] = None, ) -> None: """ Refresh cached API and/or local inventories in-place. This is intended for reusing a single `FileSelector` instance across multiple selection operations (for example when the client downloads files and the local inventory changes). """ if api_files_df is not None: self.api_files_df = ( api_files_df.copy() if isinstance(api_files_df, pd.DataFrame) else pd.DataFrame() ) if local_files_df is not None: self.local_files_df = ( local_files_df.copy() if isinstance(local_files_df, pd.DataFrame) else pd.DataFrame() ) self.api_files_df = self._normalize_file_name_col( self.api_files_df, "api_files_df" ) self.local_files_df = self._normalize_file_name_col( self.local_files_df, "local_files_df" ) # Prefer enriching local inventory with API `last-modified` without forcing an # upstream API call in `list_downloaded_files()`. if ( (not self.local_files_df.empty) and ("last-modified" not in self.local_files_df.columns) and ("last-modified" in self.api_files_df.columns) ): lm = self.api_files_df[[self.file_name_col, "last-modified"]].copy() lm = lm.drop_duplicates(subset=[self.file_name_col], keep="first") self.local_files_df = self.local_files_df.merge( lm, on=self.file_name_col, how="left" ) if self.tickers: self.datasets_for_tickers = self._resolve_datasets_for_tickers() self._dedupe_inventories() self.files_df = self.api_files_df.merge( self.local_files_df, on=self.file_name_col, how="outer", suffixes=("_api", "_local"), ) self.merged_df = self.files_df
[docs] def effective_snapshot_switchover_ts( self, *, file_group_ids: List[str], catalog_file_group_id: Optional[str] = None, ) -> Optional[pd.Timestamp]: """ Return the effective (per-request) earliest full-snapshot timestamp. Notes ----- JPMaQS can remove older full snapshots over time. For a given set of datasets we define the "switchover" as the *latest* of the datasets' earliest currently available full snapshots. If any dataset has no full snapshots at all, returns None. """ if not file_group_ids: return None base_datasets = sorted( { str(d).replace("_DELTA", "") for d in file_group_ids if isinstance(d, str) and d and ("_METADATA" not in d.upper()) } ) if catalog_file_group_id is not None: base_datasets = [d for d in base_datasets if d != catalog_file_group_id] if not base_datasets: return None api_like = self._as_local_like_df(self.api_files_df, source="api") if api_like.empty: return None filenames = api_like["filename"].astype(str) is_snapshot = ~filenames.str.contains( "_DELTA", case=False, na=False ) & ~filenames.str.contains("_METADATA", case=False, na=False) earliest_by_dataset: List[pd.Timestamp] = [] for ds in base_datasets: ds_snapshots = api_like.loc[ is_snapshot & api_like["dataset"].astype(str).eq(ds) ] if ds_snapshots.empty: return None earliest_by_dataset.append(ds_snapshots["file-timestamp"].min()) return max(earliest_by_dataset) if earliest_by_dataset else None
def _dedupe_inventories(self) -> None: for attr in ("api_files_df", "local_files_df"): df = getattr(self, attr) if df.empty: continue if "last-modified" in df.columns: df = df.sort_values("last-modified", ascending=False) df = df.drop_duplicates(subset=[self.file_name_col], keep="first") setattr(self, attr, df.reset_index(drop=True)) def _resolve_datasets_for_tickers(self) -> List[str]: if not self.tickers: return [] catalog_path: Optional[Path] = None if self.catalog_file and self.catalog_file.is_file(): catalog_path = self.catalog_file elif not self.local_files_df.empty and ("path" in self.local_files_df.columns): df = self.local_files_df.copy() if "dataset" in df.columns: df = df[df["dataset"] == "JPMAQS_METADATA_CATALOG"] else: df = df[ df[self.file_name_col] .astype(str) .str.startswith("JPMAQS_METADATA_CATALOG_") ] df = df[df["path"].notna()] if (not df.empty) and ("file-timestamp" in df.columns): df = df.sort_values("file-timestamp", ascending=False) if not df.empty: try: candidate = Path(str(df.iloc[0]["path"])) if candidate.is_file(): catalog_path = candidate except Exception: catalog_path = None if catalog_path is None: return [] try: cat = pd.read_parquet(catalog_path) except Exception: return [] ticker_col = "Ticker" if "Ticker" in cat.columns else "ticker" theme_col = "Theme" if "Theme" in cat.columns else "theme" if ticker_col not in cat.columns or theme_col not in cat.columns: return [] if self.case_sensitive: mask = cat[ticker_col].astype(str).isin(self.tickers) else: req = {t.lower() for t in self.tickers} mask = cat[ticker_col].astype(str).str.lower().isin(req) ds = ( cat.loc[mask, theme_col] .map(JPMAQS_DATASET_THEME_MAPPING) .dropna() .astype(str) .unique() .tolist() ) return sorted(set(ds)) def _as_local_like_df(self, df: pd.DataFrame, *, source: str) -> pd.DataFrame: if df.empty: return df.copy() out = df.copy() if "filename" not in out.columns: out["filename"] = out[self.file_name_col].astype(str) if "dataset" not in out.columns: base = out["filename"].astype(str).str.split(".", n=1).str[0] out["dataset"] = base.str.rsplit("_", n=1).str[0] if "e-dataset" not in out.columns: out["e-dataset"] = ( out["dataset"].astype(str).str.replace(r"_DELTA$", "", regex=True) ) if "file-timestamp" not in out.columns: if source == "api" and "file-datetime" in out.columns: out["file-timestamp"] = pd_to_datetime_compat( out["file-datetime"], utc=True ) else: base = out["filename"].astype(str).str.split(".", n=1).str[0] ts_str = base.str.rsplit("_", n=1).str[-1] out["file-timestamp"] = pd_to_datetime_compat(ts_str, utc=True) out = out[out["file-timestamp"].notna()].copy() if self.datasets_for_tickers: out = out[out["e-dataset"].isin(self.datasets_for_tickers)].copy() return out def _most_recent_local_catalog( self, *, to_datetime: Optional[Union[str, pd.Timestamp]] = None, catalog_file_group_id: str = "JPMAQS_METADATA_CATALOG", ) -> Optional[str]: if self.local_files_df.empty: return None if "path" not in self.local_files_df.columns: return None local_df = self.local_files_df.copy() local_df = local_df[local_df["path"].notna()].copy() if local_df.empty: return None local_like = self._as_local_like_df(local_df, source="local") if local_like.empty: return None local_catalogs = local_like[ local_like["dataset"].astype(str).eq(str(catalog_file_group_id)) & local_like["filename"].astype(str).str.lower().str.endswith(".parquet") ].copy() if local_catalogs.empty: return None if to_datetime is not None: cutoff = _normalize_file_timestamp_cutoff(to_datetime) local_catalogs = local_catalogs[ local_catalogs["file-timestamp"] <= cutoff ].copy() if local_catalogs.empty: return None local_catalogs = local_catalogs.sort_values( by=["file-timestamp", "filename"], ascending=False ) return str(local_catalogs.iloc[0]["path"])
[docs] def select_files_for_download( self, overwrite: bool = False, since_datetime: Optional[Union[str, pd.Timestamp]] = None, to_datetime: Optional[Union[str, pd.Timestamp]] = None, file_group_ids: Optional[List[str]] = None, include_full_snapshots: bool = True, include_delta_files: bool = True, include_metadata_files: bool = False, warn_if_no_full_snapshots: bool = False, last_modified_col: str = "last-modified", min_last_updated: Optional[Union[str, pd.Timestamp]] = None, max_last_updated: Optional[Union[str, pd.Timestamp]] = None, ) -> List[str]: """Select API file-name(s) required for a load vintage that are missing/outdated locally.""" api_like = self._as_local_like_df(self.api_files_df, source="api") if api_like.empty: return [] if file_group_ids is not None: if (not isinstance(file_group_ids, list)) or ( not all(isinstance(x, str) for x in file_group_ids) ): raise ValueError("`file_group_ids` must be a list of strings.") if "file-group-id" in api_like.columns: api_like = api_like[ api_like["file-group-id"].isin(file_group_ids) ].copy() if not include_full_snapshots: is_snapshot = ~api_like["filename"].astype(str).str.contains( "_DELTA", case=False, na=False ) & ~api_like["filename"].astype(str).str.contains( "_METADATA", case=False, na=False ) api_like = api_like.loc[~is_snapshot].copy() selected_api = _select_local_files_for_load( api_like, since_datetime=since_datetime, to_datetime=to_datetime, include_delta_files=include_delta_files, warn_if_no_full_snapshots=warn_if_no_full_snapshots, min_last_updated=min_last_updated, max_last_updated=max_last_updated, ) required = set(selected_api["filename"].astype(str).tolist()) # Metadata files are also vintage-sensitive (the date window matters). When # requested, include only metadata files that fall within the same vintage # window semantics as snapshots/deltas (date-only strings are treated as # whole-day cutoffs). if include_metadata_files: meta_mask = ( api_like["filename"] .astype(str) .str.contains("_METADATA", case=False, na=False) ) meta_df = api_like.loc[meta_mask].copy() if not meta_df.empty: meta_since_ts = ( pd_to_datetime_compat(since_datetime) if since_datetime is not None else None ) if meta_since_ts is not None and _is_date_only_string(since_datetime): meta_since_ts = meta_since_ts.normalize() meta_to_ts = ( _normalize_file_timestamp_cutoff(to_datetime) if to_datetime is not None else meta_df["file-timestamp"].max() ) if meta_since_ts is not None and meta_since_ts > meta_to_ts: meta_since_ts, meta_to_ts = meta_to_ts, meta_since_ts if meta_since_ts is not None: meta_df = meta_df[ meta_df["file-timestamp"].between(meta_since_ts, meta_to_ts) ].copy() else: meta_df = meta_df[meta_df["file-timestamp"].le(meta_to_ts)].copy() required |= set(meta_df["filename"].astype(str).tolist()) if not required: return [] if overwrite: return sorted(required) local = self.local_files_df.copy() if (not local.empty) and ("path" in local.columns): local = local[ local["path"].notna() & local["path"].astype(str).str.len().gt(0) ] local = local[local["path"].apply(lambda p: Path(str(p)).is_file())] present = ( set(local[self.file_name_col].astype(str).tolist()) if not local.empty else set() ) to_download = set(required - present) if ( (last_modified_col in self.api_files_df.columns) and (not self.local_files_df.empty) and (last_modified_col in self.local_files_df.columns) ): adf = self.api_files_df.set_index(self.file_name_col)[last_modified_col] ldf = self.local_files_df.set_index(self.file_name_col)[last_modified_col] common = adf.index.intersection(ldf.index).intersection(list(required)) if len(common) > 0: updated = common[adf.loc[common] > ldf.loc[common]] to_download |= set(map(str, updated.tolist())) return sorted(to_download)
[docs] def select_files_for_load( self, since_datetime: Optional[Union[str, pd.Timestamp]] = None, to_datetime: Optional[Union[str, pd.Timestamp]] = None, include_delta_files: bool = True, warn_if_no_full_snapshots: bool = False, min_last_updated: Optional[Union[str, pd.Timestamp]] = None, max_last_updated: Optional[Union[str, pd.Timestamp]] = None, ) -> pd.DataFrame: """Select local snapshot/delta files to load from disk (drops rows without a valid file `path`).""" if self.local_files_df.empty: return self.local_files_df.copy() if "path" not in self.local_files_df.columns: return pd.DataFrame() local_df = self.local_files_df.copy() local_df = local_df[ local_df["path"].notna() & local_df["path"].astype(str).str.len().gt(0) ] local_df = local_df[local_df["path"].apply(lambda p: Path(str(p)).is_file())] local_df = self._as_local_like_df(local_df, source="local") if local_df.empty: return local_df return _select_local_files_for_load( local_df, since_datetime=since_datetime, to_datetime=to_datetime, include_delta_files=include_delta_files, warn_if_no_full_snapshots=warn_if_no_full_snapshots, min_last_updated=min_last_updated, max_last_updated=max_last_updated, )
[docs] def oldest_api_file_timestamp(self) -> Optional[pd.Timestamp]: """ Return the oldest file timestamp present in the API inventory (UTC). Assumes `api_files_df` is an unfiltered API inventory (full history), as provided by `DataQueryFileAPIClient.list_available_files_for_all_file_groups()`. """ api_like = self._as_local_like_df(self.api_files_df, source="api") if api_like.empty: return None ts = api_like["file-timestamp"].min() if pd.isna(ts): return None return ts
def _select_local_files_for_load( files_df: pd.DataFrame, *, since_datetime: Optional[Union[str, pd.Timestamp]] = None, to_datetime: Optional[Union[str, pd.Timestamp]] = None, include_delta_files: bool = True, warn_if_no_full_snapshots: bool = False, min_last_updated: Optional[Union[str, pd.Timestamp]] = None, max_last_updated: Optional[Union[str, pd.Timestamp]] = None, ) -> pd.DataFrame: """ Single-responsibility helper: choose which local snapshot/delta files to load. The selection is per effective dataset ("e-dataset"): - If full snapshots exist for the dataset and at least one snapshot is present in the requested file-vintage window, load the latest snapshot in the window and any delta files newer than that snapshot (also within the window). - If no full snapshots exist at or before the requested vintage (effective delta-only history), load *all* available deltas up to the requested vintage. For monthly "large delta" regimes, also include the covering month-end delta file even if it timestamps after `to_datetime` (row-level filtering is handled via `max_last_updated`). """ if files_df.empty: return files_df df = files_df.copy() if "file-timestamp" not in df.columns: raise ValueError("Expected column 'file-timestamp' in files_df") if "filename" not in df.columns: raise ValueError("Expected column 'filename' in files_df") if "e-dataset" in df.columns: group_col = "e-dataset" else: group_col = "dataset" if group_col not in df.columns: raise ValueError("Expected column 'dataset' in files_df") df[group_col] = ( df[group_col].astype(str).str.replace(r"_DELTA$", "", regex=True) ) base_columns = list(df.columns) window_start_ts: Optional[pd.Timestamp] = ( pd_to_datetime_compat(since_datetime) if since_datetime is not None else None ) if window_start_ts is not None and _is_date_only_string(since_datetime): window_start_ts = window_start_ts.normalize() file_vintage_ts: pd.Timestamp = ( _normalize_file_timestamp_cutoff(to_datetime) if to_datetime is not None else df["file-timestamp"].max() ) # Row-content vintage cutoff. This is used only to decide if we must include a # covering month-end "large delta" file even when its file timestamp lies after # `to_datetime` (file_vintage_ts). content_vintage_ts: Optional[pd.Timestamp] = None if (to_datetime is not None) or (max_last_updated is not None): content_vintage_ts = _normalize_last_updated_cutoff( max_last_updated if max_last_updated is not None else to_datetime ) window_end_ts: pd.Timestamp = file_vintage_ts if window_start_ts is not None and window_start_ts > window_end_ts: window_start_ts, window_end_ts = window_end_ts, window_start_ts filenames = df["filename"].astype(str) is_delta = filenames.str.contains("_DELTA", case=False, na=False) is_metadata = filenames.str.contains("_METADATA", case=False, na=False) is_snapshot = ~is_delta & ~is_metadata file_ts = df["file-timestamp"] in_window = ( file_ts.le(window_end_ts) if window_start_ts is None else file_ts.between(window_start_ts, window_end_ts) ) # Dataset-level properties df["has_snapshot_upto_vintage"] = ( (is_snapshot & file_ts.le(file_vintage_ts)) .groupby(df[group_col]) .transform("any") .astype(bool) ) df["is_delta_only_history"] = ~df["has_snapshot_upto_vintage"] df["latest_snapshot_in_window"] = ( file_ts.where(is_snapshot & in_window).groupby(df[group_col]).transform("max") ) df["has_snapshot_in_window"] = df["latest_snapshot_in_window"].notna() need_cover = (to_datetime is not None) or (max_last_updated is not None) cover_basis_ts = ( content_vintage_ts if content_vintage_ts is not None else file_vintage_ts ) # Keep tz-aware dtype even when the column is all-NaT to avoid tz comparison errors. df["cover_ts"] = pd.to_datetime( pd.Series(pd.NaT, index=df.index), utc=True, errors="coerce" ) df["has_regular_in_month"] = False if need_cover and bool(is_delta.any()): delta_ts = df.loc[is_delta, "file-timestamp"] delta_groups = df.loc[is_delta, group_col] cover_ts_by_group = delta_ts.groupby(delta_groups).apply( lambda s: _covering_large_delta_timestamp( to_ts=cover_basis_ts, delta_file_timestamps=s.tolist() ) ) df["cover_ts"] = pd.to_datetime( df[group_col].map(cover_ts_by_group), utc=True, errors="coerce" ) def _has_regular_in_month(file_timestamps: pd.Series) -> bool: in_month = (file_timestamps.dt.year == cover_basis_ts.year) & ( file_timestamps.dt.month == cover_basis_ts.month ) is_large_delta_ts = ( (file_timestamps.dt.hour == 23) & (file_timestamps.dt.minute == 59) & (file_timestamps.dt.second == 59) ) return bool( ( in_month & file_timestamps.le(cover_basis_ts) & (~is_large_delta_ts) ).any() ) regular_by_group = delta_ts.groupby(delta_groups).apply(_has_regular_in_month) df["has_regular_in_month"] = ( df[group_col] .map(regular_by_group) .astype("boolean") .fillna(False) .astype(bool) ) cover_ts = df["cover_ts"] latest_snapshot_ts = df["latest_snapshot_in_window"] delta_cutoff_ts = file_vintage_ts if content_vintage_ts is not None and max_last_updated is not None: delta_cutoff_ts = max(delta_cutoff_ts, content_vintage_ts) selected = pd.Series(False, index=df.index) if include_delta_files: selected |= df["is_delta_only_history"] & is_delta & file_ts.le(delta_cutoff_ts) if need_cover: selected |= ( df["is_delta_only_history"] & is_delta & cover_ts.notna() & file_ts.eq(cover_ts) & file_ts.gt(delta_cutoff_ts) ) selected |= ( is_snapshot & in_window & df["has_snapshot_in_window"] & file_ts.eq(latest_snapshot_ts) ) if include_delta_files: selected |= ( is_delta & in_window & df["has_snapshot_upto_vintage"] & (~df["has_snapshot_in_window"]) ) selected |= ( is_delta & in_window & df["has_snapshot_in_window"] & file_ts.ge(latest_snapshot_ts) ) if need_cover: selected |= ( is_delta & df["has_snapshot_in_window"] & cover_ts.notna() & (~df["has_regular_in_month"]) & cover_ts.ge(latest_snapshot_ts) & file_ts.eq(cover_ts) ) out = df.loc[selected].copy() if out.empty: return df.iloc[0:0].copy()[base_columns] earliest_snapshot_ts: Optional[pd.Timestamp] = None if warn_if_no_full_snapshots and window_start_ts is not None: earliest_snapshot_ts = df.loc[is_snapshot, "file-timestamp"].min() if pd.isna(earliest_snapshot_ts): earliest_snapshot_ts = None if warn_if_no_full_snapshots and window_start_ts is not None: is_delta_out = ( out["filename"].astype(str).str.contains("_DELTA", case=False, na=False) ) snapshots_out = out.loc[~is_delta_out].copy() if ( snapshots_out.empty and bool(is_delta_out.any()) and (earliest_snapshot_ts is not None) ): earliest_snapshot_str = earliest_snapshot_ts.strftime("%Y-%m-%dT%H:%M:%SZ") logger.warning( "No full snapshots available in the requested window " f"since={window_start_ts.strftime('%Y-%m-%dT%H:%M:%SZ')} " f"to={window_end_ts.strftime('%Y-%m-%dT%H:%M:%SZ')} " f"earliest_snapshot={earliest_snapshot_str}" ) if not include_delta_files: is_delta = ( out["filename"].astype(str).str.contains("_DELTA", case=False, na=False) ) is_metadata = ( out["filename"].astype(str).str.contains("_METADATA", case=False, na=False) ) out = out.loc[~is_delta & ~is_metadata].copy() out = out.sort_values([group_col, "file-timestamp", "filename"]).reset_index( drop=True ) return out[base_columns]