"""
.. note::
This functionality is currently in BETA and is subject to significant changes
without deprecation cycles.
Client for downloading JPMaQS data files from the JPMorgan DataQuery File API.
This module provides the `DataQueryFileAPIClient`, a high-level wrapper for the
JPMorgan DataQuery File API.
The client maintains a local on-disk cache of downloaded files. By default, downloads
are written into a folder called `jpmaqs-download`. If `out_dir` is not already named
`jpmaqs-download`, the client will create/use `<out_dir>/jpmaqs-download`.
Setting up API Credentials
--------------------------
Please obtain your DataQuery API credentials (Client ID and Client Secret) from the
JPMorgan Developer Portal, as directed by DataQuery/JPMaQS support channels.
Before using the client, ensure your API credentials are set as environment variables:
.. code-block:: bash
export DQ_CLIENT_ID="your_client_id"
export DQ_CLIENT_SECRET="your_client_secret"
In line with the DataQuery SDK, the client will also check for the environment
variables `DATAQUERY_CLIENT_ID` and `DATAQUERY_CLIENT_SECRET`.
Please refer to the official documentation and IT/Tech support channels for
guidance on setting environment variables securely.
Common usage examples
---------------------
**Example 1: Initialize the client and list all available JPMaQS files.**
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
import pandas as pd
client = DataQueryFileAPIClient()
# Fetch a DataFrame of all available files for the JPMaQS group
available_files_df = client.list_available_files_for_all_file_groups()
print("Available JPMaQS files:")
print(available_files_df.head())
.. code-block:: python
Available JPMaQS files:
group-id file-group-id file-datetime is-available file-name last-modified
0 JPMAQS JPMAQS_STYLIZED_TRADING_FACTORS 2026-01-23 00:00:00+00:00 True JPMAQS_STYLIZED_TRADING_FACTORS_20260123.parquet 2026-01-23 06:13:48+00:00
1 JPMAQS JPMAQS_STYLIZED_TRADING_FACTORS 2026-01-22 00:00:00+00:00 True JPMAQS_STYLIZED_TRADING_FACTORS_20260122.parquet 2026-01-22 06:14:18+00:00
2 JPMAQS JPMAQS_STYLIZED_TRADING_FACTORS 2026-01-21 00:00:00+00:00 True JPMAQS_STYLIZED_TRADING_FACTORS_20260121.parquet 2026-01-21 06:13:11+00:00
3 JPMAQS JPMAQS_STYLIZED_TRADING_FACTORS 2026-01-20 00:00:00+00:00 True JPMAQS_STYLIZED_TRADING_FACTORS_20260120.parquet 2026-01-20 06:08:55+00:00
4 JPMAQS JPMAQS_STYLIZED_TRADING_FACTORS 2026-01-19 00:00:00+00:00 True JPMAQS_STYLIZED_TRADING_FACTORS_20260119.parquet 2026-01-19 06:11:40+00:00
**Example 2: Download all new or updated files for the current day.**
This is the recommended way to get a daily snapshot of all JPMaQS data,
including full datasets, deltas, and metadata.
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
# Use a stable local cache directory.
client = DataQueryFileAPIClient(out_dir="./jpmaqs_data")
print(f"Downloading today's files to {client.out_dir} ...")
client.download_full_snapshot()
print("Download complete.")
**Example 3: Download and load a filtered dataset**
`download()` is the main "one-stop" method: it downloads the necessary snapshot/delta
files into the local cache (unless `skip_download=True`), then loads the requested
timeseries as a DataFrame.
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
cids = ['AUD', 'CAD', 'USD', 'JPY']
xcats = ['EQXR_NSA', 'RIR_NSA']
start_date = '2000-01-01'
with DataQueryFileAPIClient(out_dir="./jpmaqs_data") as client:
df = client.download(cids=cids, xcats=xcats, start_date=start_date)
print(df.head())
.. code-block:: python
real_date cid xcat value eop_lag mop_lag grading last_updated
0 2000-01-03 AUD RIR_NSA 4.078 0.0 55.0 1.25 2024-07-25 07:27:22
1 2000-01-04 AUD RIR_NSA 3.778 0.0 56.0 1.25 2024-07-25 07:27:22
2 2000-01-05 AUD RIR_NSA 3.747 0.0 56.0 1.25 2024-07-25 07:27:22
3 2000-01-06 AUD RIR_NSA 3.710 0.0 56.0 1.25 2024-07-25 07:27:22
4 2000-01-07 AUD RIR_NSA 3.697 0.0 57.0 1.25 2024-07-25 07:27:22
**Example 3b: `download()` - ticker schema.**
Use `dataframe_format="tickers"` to keep a `ticker` column (instead of `cid`/`xcat`).
This is useful if you want to pivot to a matrix for modeling.
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
tickers = ["USD_RIR_NSA", "EUR_RIR_NSA", "JPY_RIR_NSA"]
with DataQueryFileAPIClient(out_dir="./jpmaqs_data") as client:
df = client.download(
tickers=tickers,
metrics=["value"],
start_date="2015-01-01",
dataframe_format="tickers",
)
df_pivot = df.pivot(index="real_date", columns="ticker", values="value")
**Example 3c: `download()` - large pulls with Polars (lazy)**
For large requests, `dataframe_type="polars-lazy"` keeps the result lazy so you can
filter/transform before collecting.
.. code-block:: python
import pandas as pd
import polars as pl
from macrosynergy.download import DataQueryFileAPIClient
with DataQueryFileAPIClient(out_dir="./jpmaqs_data") as client:
cat_df = pd.read_parquet(client.download_catalog_file())
cat_df = cat_df[cat_df["Ticker"].str.startswith(("USD_", "EUR_"))]
usd_eur_tickers = cat_df["Ticker"].tolist()
lf = client.download(
tickers=usd_eur_tickers,
start_date="2010-01-01",
metrics=["value", "last_updated"],
include_file_column=True,
dataframe_type="polars-lazy",
)
# Example: filter further before materializing
df = lf.filter(pl.col("real_date") >= pl.date(2020, 1, 1)).collect()
**Example 4: Download a slice of the dataset "as of" a specific date-time.**
This method downloads the necessary snapshot and delta files to reconstruct the
dataset as of the specified date-time.
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
import pandas as pd
tickers = ['AUD_EQXR_NSA', 'CAD_EQXR_NSA', 'USD_EQXR_NSA', 'JPY_EQXR_NSA']
with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq:
# Cut off at noon on 2025-11-12 (UTC)
df = dq.download_as_of(tickers=tickers, as_of_datetime="2025-11-12T12:00:00")
# inline with T-1 release schedule
assert df['real_date'].max() <= pd.Timestamp("2025-11-12")
assert df['last_updated'].max() <= pd.Timestamp("2025-11-12T12:00:00")
print(df.head())
**Example 5: Download all new or updated delta-files since a specific date/time.**
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
import pandas as pd
client = DataQueryFileAPIClient(out_dir="./jpmaqs_data")
since_datetime = (pd.Timestamp.utcnow() - pd.DateOffset(days=10)).strftime("%Y%m%d")
client.download_full_snapshot(
since_datetime=since_datetime,
include_full_snapshots=False,
include_metadata=True,
include_delta=True,
)
print("Download complete.")
**Example 6: Download a single, specific historical file.**
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
client = DataQueryFileAPIClient(out_dir="./jpmaqs_data")
# This specific filename can be found using the list_available_files... methods
target_filename = "JPMAQS_MACROECONOMIC_BALANCE_SHEETS_20250414.parquet"
print(f"Downloading {target_filename}...")
file_path = client.download_file(filename=target_filename)
print(f"File downloaded to: {file_path}")
**Example 7: Check availability for a specific file-group.**
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
client = DataQueryFileAPIClient()
file_group_id = "JPMAQS_MACROECONOMIC_BALANCE_SHEETS"
available_files = client.list_available_files(file_group_id=file_group_id)
# print the earliest file's details
print(available_files.iloc[-1])
**Example 8: Load "notification" metadata (missing updates & revisions).**
JPMaQS publishes daily metadata notification JSON files that summarize:
- Missing updates ("Missing Updates")
- Additional info about missing updates ("Additional information on missing updates")
- Changed historical values ("Changed historical values")
The helpers below download the relevant metadata for the requested date (UTC, business-day
window) if needed, and return the notifications as pandas DataFrames.
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
with DataQueryFileAPIClient(out_dir="./jpmaqs_data") as client:
missing_df = client.get_missing_data_notifications(date="2026-01-19")
revisions_df = client.get_revisions_notifications(date="2026-01-19")
print(missing_df.head())
print(revisions_df.head())
**Example 9: Download all historical full snapshot files (vintages) for JPMaQS.**
Please note:
- This is a **VERY LARGE** download, taking 1hr+ and around 1GB/snapshot.
- This method is **NOT** recommended for regular use.
- This method should **ONLY** be used for audit and archival purposes.
.. code-block:: python
from macrosynergy.download import DataQueryFileAPIClient
client = DataQueryFileAPIClient(out_dir="./jpmaqs_full_snapshots")
earliest_date = "20220101" # a date before the earliest available file
client.download_full_snapshot(
since_datetime=earliest_date,
include_delta=False,
include_metadata=False,
)
---
Please find below the documentation for the `DataQueryFileAPIClient` and related
classes/methods.
"""
import os
import warnings
import pandas as pd
import polars as pl
import time
from pathlib import Path
import concurrent.futures as cf
import logging
import traceback as tb
from typing import Dict, Any, Optional, List, Tuple, Union
import requests
from tqdm import tqdm
import json
from macrosynergy.compat import PYTHON_3_8_OR_LATER
from macrosynergy.download.dataquery import JPMAQS_GROUP_ID
from macrosynergy.download.fusion_interface import (
request_wrapper,
request_wrapper_stream_bytes_to_disk,
cache_decorator,
)
from macrosynergy.download.dataquery import OAUTH_TOKEN_URL
from macrosynergy.download.exceptions import DownloadError, InvalidResponseError
from macrosynergy.download.jpm_oauth import JPMorganOAuth
from macrosynergy.download.dataquery_file_api.constants import ( # noqa: F401
JPMAQS_DATASET_THEME_MAPPING,
JPMAQS_EARLIEST_FILE_DATE,
DQ_FILE_API_BASE_URL,
DQ_FILE_API_FALLBACK_BASE_URL,
DQ_FILE_API_SCOPE,
DQ_FILE_API_TIMEOUT,
DQ_FILE_API_HEADERS_TIMEOUT,
DQ_FILE_API_DELAY_PARAM,
DQ_FILE_API_DELAY_MARGIN,
DQ_FILE_API_SEGMENT_SIZE_MB,
DQ_FILE_API_STREAM_CHUNK_SIZE,
)
from macrosynergy.download.dataquery_file_api.common import (
RateLimitedRequester,
JPMaQSParquetExpectedColumns,
pd_to_datetime_compat,
validate_dq_timestamp,
_large_delta_file_datetimes,
get_current_or_last_business_day,
_is_date_only_string,
_normalize_file_timestamp_cutoff,
_normalize_last_updated_cutoff,
_downloaded_files_df,
)
from macrosynergy.download.dataquery_file_api.file_selector import FileSelector
from macrosynergy.download.dataquery_file_api.segmented_file_downloader import (
SegmentedFileDownloader,
)
from macrosynergy.download.dataquery_file_api.file_loader import lazy_load_from_parquets
logger = logging.getLogger(__name__)
def _resolve_base_url(
primary: str,
fallback: str,
timeout: float = 10.0,
verify: bool = True,
proxies: Optional[Dict[str, str]] = None,
) -> str:
"""
Probe which DataQuery File API base URL is reachable.
Tries *primary* first; on connection failure, falls back to *fallback*.
The result is not cached globally - each `DataQueryFileAPIClient`
instance re-probes on construction so that transient network failures do
not persist beyond the instance lifecycle.
"""
for url, is_fallback in [(primary, False), (fallback, True)]:
try:
requests.head(url, timeout=timeout, verify=verify, proxies=proxies)
except requests.exceptions.RequestException:
if not is_fallback:
logger.debug(
"Primary DataQuery File API URL not reachable (%s), "
"trying fallback...",
primary,
)
continue
if is_fallback:
warnings.warn(
f"The primary DataQuery File API URL is not reachable: "
f"{primary}\n"
f"Falling back to: {url}\n"
f"Please whitelist/allow the primary URL in your "
f"network/firewall configuration.",
UserWarning,
stacklevel=2,
)
logger.warning(
"DataQuery File API URL fallback active: using %s instead of %s.",
url,
primary,
)
return url
# Both unreachable - return primary and let normal error handling surface it
return primary
[docs]class DataQueryFileAPIOauth(JPMorganOAuth):
"""
A class to handle OAuth authentication for the JPMorgan DataQuery File API.
"""
def __init__(
self,
client_id: str,
client_secret: str,
resource: str = DQ_FILE_API_SCOPE,
auth_url: str = OAUTH_TOKEN_URL,
root_url: str = DQ_FILE_API_BASE_URL,
application_name: str = "DataQueryFileAPI",
proxies: Optional[Dict[str, str]] = None,
verify: bool = True,
**kwargs,
):
super().__init__(
client_id=client_id,
client_secret=client_secret,
resource=resource,
application_name=application_name,
auth_url=auth_url,
root_url=root_url,
proxies=proxies,
verify=verify,
**kwargs,
)
[docs]class DataQueryFileAPIClient(RateLimitedRequester):
"""
A client for accessing JPMaQS product files via the JPMorgan DataQuery File API.
This client provides an alternative distribution channel to the Fusion API for JPMaQS
data. It is designed to list and download JPMaQS data files, which are
available as full snapshots, daily deltas, and metadata files. The client handles
authentication, API requests, and file downloads, including large file downloads
using a segmented, concurrent approach.
Parameters
----------
client_id : Optional[str]
Client ID for authentication. If not provided, it will be sourced from
environment variables (`DQ_CLIENT_ID` or `DATAQUERY_CLIENT_ID`).
client_secret : Optional[str]
Client Secret for authentication. If not provided, it will be sourced from
environment variables (`DQ_CLIENT_SECRET` or `DATAQUERY_CLIENT_SECRET`).
out_dir : Optional[str]
Base output directory for downloads. The effective cache directory is always a
folder named `jpmaqs-download` (either `out_dir` itself, or
`<out_dir>/jpmaqs-download`). A client instance is bound to this directory.
base_url : str
The base URL for the DataQuery File API. Defaults to `DQ_FILE_API_BASE_URL`.
scope : str
The API scope for authentication. Defaults to `DQ_FILE_API_SCOPE`.
proxies : Optional[Dict[str, str]]
Optional proxies to use for HTTP requests. Defaults to None.
verify_ssl : bool
If True, verifies SSL certificates for all requests. Defaults to True.
"""
def __init__(
self,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
out_dir: Optional[str] = None,
base_url: str = DQ_FILE_API_BASE_URL,
scope: str = DQ_FILE_API_SCOPE,
proxies: Optional[Dict[str, str]] = None,
verify_ssl: bool = True,
api_delay: float = DQ_FILE_API_DELAY_PARAM,
api_delay_margin: float = DQ_FILE_API_DELAY_MARGIN,
):
super().__init__(api_delay=api_delay * api_delay_margin)
if not (bool(client_id) and bool(client_secret)):
client_id, client_secret = get_client_id_secret()
if not (bool(client_id) and bool(client_secret)):
raise ValueError(
"Client ID and Client Secret must be provided either as arguments or "
"via environment variables DQ_CLIENT_ID & DQ_CLIENT_SECRET or "
"DATAQUERY_CLIENT_ID & DATAQUERY_CLIENT_SECRET"
)
self.client_id = client_id
self.client_secret = client_secret
self.out_dir = self._normalize_out_dir(out_dir or "./jpmaqs-download")
self.base_url = _resolve_base_url(
primary=base_url,
fallback=DQ_FILE_API_FALLBACK_BASE_URL,
verify=verify_ssl,
proxies=proxies,
).rstrip("/")
self.scope = scope
self.proxies = proxies
self.verify_ssl = verify_ssl
self.catalog_file_group_id = "JPMAQS_METADATA_CATALOG"
self.oauth = DataQueryFileAPIOauth(
client_id=self.client_id,
client_secret=self.client_secret,
resource=self.scope,
verify=self.verify_ssl,
)
self._file_selector = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None:
logger.error(tb.format_exc())
return False
@property
def file_selector(self) -> FileSelector:
"""
Cached `FileSelector` instance for this client.
Notes
-----
This property is intentionally designed so it does not fetch the API file
inventory when first accessed. Download operations (e.g. `download_full_snapshot`)
refresh the selector with the latest unfiltered API inventory as needed.
"""
if self._file_selector is None:
local_files_df = self.list_downloaded_files(
include_last_modified_columns=False
)
self._file_selector = FileSelector(
api_files_df=None,
local_files_df=local_files_df,
file_name_col="file-name",
)
return self._file_selector
def _refresh_file_selector(self) -> None:
"""
Refresh the cached `FileSelector` local inventory from disk.
This is a lightweight, network-free refresh used after downloads so subsequent
selection/load operations see the updated local cache.
"""
if self._file_selector is None:
return
try:
local_files_df = self.list_downloaded_files(
include_last_modified_columns=False
)
self._file_selector.refresh(local_files_df=local_files_df)
except Exception:
logger.warning("Failed to refresh file selector inventory after download.")
pass
@staticmethod
def _normalize_out_dir(out_dir: Union[str, Path]) -> str:
"""
Normalize an output directory to the effective JPMaQS cache directory.
The DataQuery File API client stores all downloads under a folder called
`jpmaqs-download`. If `out_dir` is not already named `jpmaqs-download`, this
method appends a `jpmaqs-download` subdirectory.
"""
out_dir_str = os.fspath(out_dir)
stripped = out_dir_str.rstrip("/\\")
if os.path.basename(stripped) == "jpmaqs-download":
return stripped
return str(Path(stripped) / "jpmaqs-download")
def _get(
self, endpoint: str, params: Optional[Dict[str, Any]] = None, retries: int = 3
) -> Dict[str, Any]:
"""
Executes a GET request to a specified endpoint with retry logic.
Parameters
----------
endpoint : str
The API endpoint to call.
params : Optional[Dict[str, Any]]
A dictionary of query parameters for the request.
retries : int
The number of times to retry the request in case of failure.
Returns
-------
Dict[str, Any]
The JSON response from the API as a dictionary.
"""
url = f"{self.base_url}{endpoint}"
headers = self.oauth.get_headers()
for _ in range(retries):
try:
self._wait_for_api_call()
return request_wrapper(
method="GET",
url=url,
headers=headers,
params=params or {},
proxies=self.proxies,
as_json=True,
api_delay=0, # handled by self._wait_for_api_call
skip_wait=True,
verify_ssl=self.verify_ssl,
)
except Exception as e:
logger.error(f"Error occurred during GET request: {e}")
if _ == retries - 1:
raise
logger.info(f"Retrying... ({_ + 1}/{retries})")
time.sleep(2**_)
[docs] def list_groups(self) -> pd.DataFrame:
"""
Lists all available data provider groups.
Returns
-------
pd.DataFrame
A DataFrame containing details of available groups.
"""
endpoint = "/groups"
payload = self._get(endpoint, {})
return pd.json_normalize(payload, record_path=["groups"])
[docs] def search_groups(self, keywords: str) -> pd.DataFrame:
"""
Searches for data provider groups that match the given keywords.
Parameters
----------
keywords : str
Keywords to search for in group names and descriptions.
Returns
-------
pd.DataFrame
A DataFrame of groups matching the search criteria.
"""
endpoint = "/groups/search"
payload = self._get(endpoint, {"keywords": keywords})
return pd.json_normalize(payload, record_path=["groups"])
[docs] @cache_decorator(ttl=60)
def list_group_files(
self,
group_id: str = JPMAQS_GROUP_ID,
include_full_snapshots: bool = True,
include_delta: bool = True,
include_metadata: bool = True,
) -> pd.DataFrame:
"""
Lists all file groups (datasets) for a specific data provider.
Parameters
----------
group_id : str
The identifier for the data provider group, defaults to the JPMaQS group.
include_full_snapshots : bool
If True, include full snapshot file groups in the result.
include_delta : bool
If True, include delta file groups in the result.
include_metadata : bool
If True, include metadata file groups in the result.
Returns
-------
pd.DataFrame
A DataFrame listing the available file groups.
"""
if not any([include_full_snapshots, include_delta, include_metadata]):
raise ValueError(
"At least one of `include_full_snapshots`, `include_delta`, or "
"`include_metadata` must be True"
)
endpoint = "/group/files"
payload = self._get(endpoint, {"group-id": group_id})
df = pd.json_normalize(payload, record_path=["file-group-ids"])
isdeltafile = df["file-group-id"].str.endswith("_DELTA")
ismetadata = df["file-group-id"].str.contains("_METADATA")
isfullsnapshot = ~(isdeltafile | ismetadata)
mask = pd.Series(False, index=df.index)
if include_full_snapshots:
mask |= isfullsnapshot
if include_delta:
mask |= isdeltafile
if include_metadata:
mask |= ismetadata
df = df[mask]
df = df.sort_values(by=["item"]).reset_index(drop=True)
return df
[docs] @cache_decorator(ttl=60)
def list_available_files(
self,
file_group_id: Optional[str] = None,
group_id: str = JPMAQS_GROUP_ID,
start_date: str = JPMAQS_EARLIEST_FILE_DATE,
end_date: str = None,
convert_metadata_timestamps: bool = True,
include_unavailable: bool = False,
) -> pd.DataFrame:
"""
Lists all available files for a specific file group within a date range.
Parameters
----------
file_group_id : Optional[str]
The identifier for the file group (e.g. "JPMAQS_MACROECONOMIC_BALANCE_SHEETS").
If None, returns all files for the group_id. Defaults to None.
group_id : str
The identifier for the data provider group.
start_date : str
The start date for the search in "YYYYMMDD" format.
end_date : str
The end date for the search in "YYYYMMDD" format. Defaults to today.
convert_metadata_timestamps : bool
If True, convert timestamp columns to datetime objects.
include_unavailable : bool
If True, includes files that are listed but not currently available.
Returns
-------
pd.DataFrame
A DataFrame of available files with their details.
"""
if end_date is None:
end_date = pd.Timestamp.utcnow().strftime("%Y%m%d")
endpoint = "/group/files/available-files"
params = {
"group-id": group_id,
"file-group-id": file_group_id,
"start-date": start_date,
"end-date": end_date,
}
# Extra 1-second throttle: the listing endpoint has a stricter rate limit
# than the download endpoints. _get() adds its own standard delay on top.
self._wait_for_api_call(1)
payload = self._get(endpoint, params)
df = pd.json_normalize(payload, record_path=["available-files"])
if "file-datetime" not in df.columns:
raise InvalidResponseError(
f'Missing "file-datetime" in response from {endpoint} with params {params}'
)
if not include_unavailable:
df = df[df["is-available"]].copy()
df.loc[:, "file-datetime"] = df["file-datetime"].astype(str)
# Sort by real timestamp while leaving the column as string
df["_ts"] = pd_to_datetime_compat(df["file-datetime"], utc=True)
df = (
df.sort_values("_ts", ascending=False)
.drop(columns="_ts")
.reset_index(drop=True)
)
if convert_metadata_timestamps:
for col in ["file-datetime", "last-modified"]:
if col not in df.columns:
raise InvalidResponseError(f'Missing "{col}" in response')
df[col] = pd_to_datetime_compat(df[col], utc=True)
return df
[docs] @cache_decorator(ttl=60)
def list_available_files_for_all_file_groups(
self,
group_id: str = JPMAQS_GROUP_ID,
start_date: str = JPMAQS_EARLIEST_FILE_DATE,
end_date: str = None,
include_full_snapshots: bool = True,
include_delta: bool = True,
include_metadata: bool = True,
convert_metadata_timestamps: bool = True,
include_unavailable: bool = False,
) -> pd.DataFrame:
"""
Fetches and consolidates available files for all relevant file groups.
This method is simply a convenience wrapper for `list_available_files`.
Parameters
----------
group_id : str
The identifier for the data provider group.
start_date : str
The start date for the search in "YYYYMMDD" format.
end_date : str
The end date for the search in "YYYYMMDD" format. Defaults to today.
include_full_snapshots : bool
If True, query for full snapshot file groups.
include_delta : bool
If True, query for delta file groups.
include_metadata : bool
If True, query for metadata file groups.
convert_metadata_timestamps : bool
If True, convert timestamp columns to datetime objects.
include_unavailable : bool
If True, include files that are listed but not currently available.
Returns
-------
pd.DataFrame
A consolidated DataFrame of all available files.
"""
files_df = self.list_available_files(
file_group_id=None,
group_id=group_id,
start_date=start_date,
end_date=end_date,
convert_metadata_timestamps=convert_metadata_timestamps,
include_unavailable=include_unavailable,
)
if files_df.empty:
return files_df
if not any([include_full_snapshots, include_delta, include_metadata]):
raise ValueError(
"At least one of `include_full_snapshots`, `include_delta`, or "
"`include_metadata` must be True"
)
if "file-name" not in files_df.columns:
raise InvalidResponseError('Missing "file-name" in response')
delta_mask = (
files_df["file-name"]
.astype(str)
.str.contains("_DELTA_", case=False, na=False)
)
metadata_mask = (
files_df["file-name"]
.astype(str)
.str.contains("_METADATA_", case=False, na=False)
)
full_snapshot_mask = ~(delta_mask | metadata_mask)
mask = pd.Series(False, index=files_df.index)
if include_full_snapshots:
mask |= full_snapshot_mask
if include_delta:
mask |= delta_mask
if include_metadata:
mask |= metadata_mask
return files_df.loc[mask].copy()
[docs] def filter_available_files_by_datetime(
self,
since_datetime: Optional[str] = None,
to_datetime: Optional[str] = None,
include_full_snapshots: bool = True,
include_delta: bool = True,
include_metadata: bool = True,
include_unavailable: bool = False,
) -> pd.DataFrame:
"""
Retrieve files whose *file timestamp* (`file-datetime`) falls within a datetime window.
Notes
-----
Despite the wording in older docs, this method filters on `file-datetime`
(file-vintage timestamp), not `last-modified`.
Parameters
----------
since_datetime : Optional[str]
The start of the time window (inclusive). Format "YYYYMMDD", "YYYYMMDDTHHMMSS",
or an ISO 8601 datetime string (for example "YYYY-MM-DDTHH:MM:SSZ").
Defaults to the start of the current day (UTC).
to_datetime : Optional[str]
The end of the time window (inclusive). Uses the same formats as `since_datetime`.
Defaults to the current timestamp (UTC).
include_full_snapshots : bool
If True, include full snapshot files in the search.
include_delta : bool
If True, include delta files in the search.
include_metadata : bool
If True, include metadata files in the search.
include_unavailable : bool
If True, include files that are not currently available for download.
Returns
-------
pd.DataFrame
A DataFrame of files whose `file-datetime` falls in the specified window.
"""
if since_datetime is None:
since_datetime = pd.Timestamp.utcnow().strftime("%Y%m%d")
if to_datetime is None:
to_datetime = pd.Timestamp.utcnow().strftime("%Y%m%dT%H%M%S")
validate_dq_timestamp(since_datetime, var_name="since_datetime")
validate_dq_timestamp(to_datetime, var_name="to_datetime")
since_ts = pd_to_datetime_compat(since_datetime, utc=True)
to_ts = pd_to_datetime_compat(to_datetime, utc=True)
if "T" not in str(since_datetime):
since_ts = since_ts.normalize()
if "T" not in str(to_datetime):
to_ts = (
to_ts.normalize() + pd.DateOffset(days=1) - pd.Timedelta(nanoseconds=1)
)
if since_ts > to_ts:
logger.warning(
f"`since_datetime` ({since_ts}) is after `to_datetime` ({to_ts}). Swapping values."
)
since_ts, to_ts = to_ts, since_ts
# Using DQ's internal filtering does not work as expected for JPMaQS end users,
# hence filtering is done locally instead of passing API parameters.
files_df = self.list_available_files_for_all_file_groups(
include_full_snapshots=include_full_snapshots,
include_delta=include_delta,
include_metadata=include_metadata,
include_unavailable=include_unavailable,
)
files_df = files_df[files_df["file-datetime"].between(since_ts, to_ts)]
files_df = files_df.sort_values(
by=["file-datetime", "last-modified"],
ascending=[False, False],
).reset_index(drop=True)
return files_df
[docs] def check_file_availability(
self,
file_group_id: str = None,
file_datetime: str = None,
filename: Optional[str] = None,
) -> pd.DataFrame:
"""
Checks if a specific file is available for download.
Provide either (`file_group_id` and `file_datetime`) or `filename`.
Parameters
----------
file_group_id : str
The identifier for the file group.
file_datetime : str
The file's timestamp identifier.
filename : Optional[str]
The full name of the file (e.g., "JPMAQS_GENERIC_RETURNS_20250501.parquet").
Returns
-------
pd.DataFrame
A DataFrame with the file's availability status.
"""
if not ((bool(file_group_id) and bool(file_datetime)) ^ bool(filename)):
raise ValueError(
"One of `file_group_id` & `file_datetime`, or `filename` must be provided."
)
if filename:
try:
file_group_id, _fdt_with_ext = Path(filename).name.rsplit("_", 1)
file_datetime = _fdt_with_ext.split(".")[0]
except ValueError as e:
raise ValueError(f"Invalid filename format: {filename}") from e
endpoint = "/group/file/availability"
params = {"file-group-id": file_group_id, "file-datetime": file_datetime}
payload = self._get(endpoint, params)
return pd.json_normalize(payload)
[docs] def download_file(
self,
file_group_id: str = None,
file_datetime: str = None,
filename: Optional[str] = None,
overwrite: bool = False,
chunk_size: Optional[int] = None,
timeout: Optional[float] = DQ_FILE_API_TIMEOUT,
max_retries: int = 3,
) -> str:
"""
Download a single DataQuery file to the client's output directory.
This method can be called with either (`file_group_id` and `file_datetime`) or
a `filename`.
- Snapshot/delta datasets are typically `.parquet`.
- Some metadata file groups publish `.json` files (pass `filename=...`).
For large snapshot files, it automatically uses the `SegmentedFileDownloader`
for a robust, multi-part download.
Parameters
----------
file_group_id : str
The identifier of the file group to download from.
file_datetime : str
The timestamp of the file to download.
filename : Optional[str]
The full filename to download. Overrides `file_group_id` and `file_datetime`.
overwrite : bool
If True, overwrites the file if it already exists. Default is False.
chunk_size : Optional[int]
The chunk size for streaming downloads (in bytes).
timeout : Optional[float]
The timeout for the download request in seconds.
max_retries : int
The number of retries for the entire file download.
Returns
-------
str
The full path to the downloaded file.
"""
if not ((bool(file_group_id) and bool(file_datetime)) ^ bool(filename)):
raise ValueError(
"One of `file_group_id` & `file_datetime`, or `filename` must be provided."
)
if not file_group_id:
try:
file_group_id, file_datetime_with_ext = filename.rsplit("_", 1)
file_datetime = file_datetime_with_ext.split(".")[0]
except ValueError:
raise ValueError(f"Invalid filename format: {filename}")
endpoint = "/group/file/download"
url = f"{self.base_url}{endpoint}"
headers = self.oauth.get_headers()
params = {"file-group-id": file_group_id, "file-datetime": file_datetime}
file_name = filename or f"{file_group_id}_{file_datetime}.parquet"
file_date = pd_to_datetime_compat(file_datetime).strftime("%Y-%m-%d")
file_path = Path(self.out_dir) / Path(file_date) / Path(file_name)
file_path.parent.mkdir(parents=True, exist_ok=True)
if file_path.exists():
if not overwrite:
logger.warning(f"File {file_path} already exists. Skipping download.")
return str(file_path)
logger.warning(f"File {file_path} already exists. It will be overwritten.")
file_path.unlink()
logger.info(f"Starting download of {file_name}...")
start = time.time()
download_args = dict(
filename=str(file_path),
url=url,
headers=headers,
params=params,
proxies=self.proxies,
chunk_size=chunk_size,
timeout=timeout,
verify_ssl=self.verify_ssl,
)
is_small_file = any(x in file_group_id.lower() for x in ["delta", "metadata"])
if "_DELTA" in file_group_id:
is_small_file = file_datetime not in _large_delta_file_datetimes()
if is_small_file:
self._wait_for_api_call()
request_wrapper_stream_bytes_to_disk(
**download_args,
api_delay=0,
skip_wait=True,
)
else:
SegmentedFileDownloader(
**download_args,
api_delay=DQ_FILE_API_DELAY_PARAM,
api_delay_margin=DQ_FILE_API_DELAY_MARGIN,
parent_requester=self,
max_file_retries=max_retries,
start_download=True,
)
time_taken = time.time() - start
logger.info(
f"Downloaded {file_name} in {time_taken:.2f} seconds to {file_path}"
)
return str(file_path)
[docs] def delete_corrupt_files(
self,
files: Optional[List[str]] = None,
delete: bool = True,
) -> List[str]:
"""
Check downloaded files for corruption and optionally delete them.
Parameters
----------
files : Optional[List[str]]
A list of file names (as in `list_downloaded_files()["file-name"]`) to check
for corruption. If None, scans all downloaded files in the client's output
directory.
delete : bool
If True, corrupt files are deleted from disk. If False, corrupt files
are only logged as warnings (no files are removed). Default is True.
Returns
-------
List[str]
A list of file paths that were identified as corrupt (and deleted if
`delete=True`).
"""
avail_files = self.list_downloaded_files()
if avail_files.empty:
return []
if files is not None:
if not all(isinstance(f, str) for f in files):
raise ValueError(
"All items in `files` must be strings representing file names."
)
avail_files = avail_files[avail_files["file-name"].isin(files)]
files = sorted(set(map(str, avail_files["path"])))
extensions = sorted(set(Path(f).suffix.rsplit(".", 1)[-1] for f in files))
return _delete_corrupt_files(
files=files,
extensions=extensions,
root_dir=Path(self.out_dir),
delete=delete,
)
[docs] def cleanup_old_files(
self,
days_to_keep: int = 5,
) -> List[str]:
"""
Deletes files older than the specified number of days from the output directory.
Parameters
----------
days_to_keep : int
The number of days to retain files. This is measured from the latest file date
within each file group. Files older than this threshold will be deleted.
Returns
-------
List[str]
A list of file paths that were deleted.
"""
if not isinstance(days_to_keep, int):
raise ValueError("`days_to_keep` must be a non-negative integer.")
if days_to_keep < 0:
logger.warning(
"`days_to_keep` is negative; it will be treated as the absolute value."
f" ({days_to_keep} -> {abs(days_to_keep)})"
)
days_to_keep = abs(days_to_keep)
if days_to_keep == 0:
logger.warning(
"`days_to_keep=0` does not delete any files. "
"Set `days_to_keep` to a positive integer to retain only "
"the most recent N days of files per file group."
)
return []
found_files = self.list_downloaded_files()
if found_files.empty:
return []
dataset_col = "file-group-id"
if dataset_col not in found_files.columns:
dataset_col = "dataset"
if dataset_col not in found_files.columns:
raise InvalidResponseError(
'Missing expected column "dataset" or "file-group-id" in downloaded files listing.'
)
fg_dt_mapping: Dict[str, pd.Timestamp] = (
found_files.groupby(dataset_col)["file-timestamp"].max().to_dict()
)
cutoff_dates = {
fg: (dt - pd.Timedelta(days=days_to_keep)).normalize()
for fg, dt in fg_dt_mapping.items()
}
files_to_delete = []
deleted_files: List[str] = []
for _, row in found_files.iterrows():
fg = row[dataset_col]
fdt = row["file-timestamp"]
if fdt < cutoff_dates[fg]:
files_to_delete.append(str(row["path"]))
for file_path in files_to_delete:
try:
Path(file_path).unlink()
deleted_files.append(file_path)
logger.info(f"Deleted old file: {file_path}")
except Exception as e:
logger.error(f"Failed to delete file {file_path}: {e}")
return sorted(deleted_files)
[docs] def download_multiple_files(
self,
filenames: List[str],
overwrite: bool = False,
max_retries: int = 3,
n_jobs: int = None,
chunk_size: Optional[int] = None,
timeout: Optional[float] = DQ_FILE_API_TIMEOUT,
show_progress: bool = True,
delete_corrupt_files: bool = False,
) -> None:
"""
Downloads a list of files concurrently with progress indication.
Parameters
----------
filenames : List[str]
A list of full filenames to be downloaded.
overwrite : bool
If True, overwrites files if they already exist. Default is False.
max_retries : int
The number of times to retry downloading the entire list of failed files.
n_jobs : int
The number of concurrent download jobs. If -1, it uses all available cores.
chunk_size : Optional[int]
The chunk size for streaming downloads (in bytes).
timeout : Optional[float]
The timeout for each download request in seconds.
show_progress : bool
If True, displays a progress bar for the downloads.
delete_corrupt_files : bool
If True, corrupt files are deleted after download and retried. If False
(default), corrupt files are only logged as warnings. Users can run
`delete_corrupt_files()` separately after verifying the logs.
"""
Path(self.out_dir).mkdir(parents=True, exist_ok=True)
start_time = time.time()
logger.info(f"Starting download of {len(filenames)} files.")
failed_files = []
if n_jobs == -1:
n_jobs = None
with cf.ThreadPoolExecutor(max_workers=n_jobs) as executor:
futures = {}
for filename in tqdm(
filenames,
desc="Requesting files",
disable=not show_progress,
):
futures[
executor.submit(
self.download_file,
filename=filename,
overwrite=overwrite,
chunk_size=chunk_size,
timeout=timeout,
)
] = filename
# Small stagger to avoid submitting all futures at once;
# actual rate-limiting is handled by _wait_for_api_call().
time.sleep(0.01)
for future in tqdm(
cf.as_completed(futures),
total=len(futures),
desc="Downloading files",
disable=not show_progress,
):
fname = futures[future]
try:
future.result()
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
raise
except Exception as e:
logger.error(f"Failed to download {fname}: {e}")
failed_files.append(fname)
found_corrupt_files = self.delete_corrupt_files(
files=filenames,
delete=delete_corrupt_files,
)
if delete_corrupt_files:
corrupt_filenames = [Path(p).name for p in found_corrupt_files]
failed_files = sorted(set(failed_files + corrupt_filenames))
if not failed_files:
total_time = time.time() - start_time
logger.info(
f"Successfully downloaded {len(filenames)} files in {total_time:.2f} seconds."
)
return # All downloads successful
log_msg = f"Failed to download {len(failed_files)} files"
if max_retries > 0:
log_msg += f"; retrying {max_retries} more times"
else:
log_msg += "; no retries left"
logger.warning(log_msg)
if max_retries == 0:
logger.error(f"Files failed after retries: {failed_files}")
raise DownloadError(f"Files failed after retries: {failed_files}")
return self.download_multiple_files(
filenames=failed_files,
max_retries=max_retries - 1,
n_jobs=n_jobs,
chunk_size=chunk_size,
timeout=timeout,
show_progress=show_progress,
delete_corrupt_files=delete_corrupt_files,
)
[docs] def download_catalog_file(
self,
overwrite: bool = False,
timeout: Optional[float] = DQ_FILE_API_TIMEOUT,
) -> str:
"""
Download (or resolve) the most recent JPMaQS catalog parquet file.
The catalog is used for ticker validation and mapping tickers to underlying
JPMaQS datasets.
Notes
-----
- The "latest" catalog is determined by an API call to
`list_available_files(self.catalog_file_group_id)`.
- If the latest catalog already exists locally and `overwrite=False`, this
method returns the local path (no download required).
- If the latest catalog cannot be downloaded, this method raises an error (no
fallback to older local catalogs).
"""
available_catalogs = self.list_available_files(self.catalog_file_group_id)
if available_catalogs.empty:
raise DownloadError("No catalog files available for download.")
latest_catalog = available_catalogs.sort_values(
by=["file-datetime", "last-modified", "file-name"], ascending=False
).iloc[0]
latest_filename = str(latest_catalog["file-name"])
logger.info(f"Latest catalog file identified: {latest_filename}")
existing_files = self.list_downloaded_files(include_last_modified_columns=False)
if (not overwrite) and (not existing_files.empty):
local_match = existing_files[
existing_files["file-name"].astype(str).eq(latest_filename)
]
if not local_match.empty:
file_path = str(local_match.iloc[0]["path"])
logger.info(f"Catalog file already downloaded (latest): {file_path}")
self._refresh_file_selector()
return file_path
try:
file_path = self.download_file(
filename=latest_filename,
overwrite=overwrite,
timeout=timeout,
)
except KeyboardInterrupt:
raise
except Exception as e:
raise DownloadError(
f"Failed to download latest catalog file '{latest_filename}': {e}"
) from e
self._refresh_file_selector()
return str(file_path)
[docs] def get_datasets_for_indicators(
self,
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
case_sensitive: bool = False,
catalog_file: Optional[str] = None,
) -> List[str]:
"""
Return the list of JPMaQS datasets that contain the requested tickers.
This loads the JPMaQS catalog parquet and maps the catalog `Theme` column to
DataQuery file-group-ids via `JPMAQS_DATASET_THEME_MAPPING`.
Notes
-----
- Unknown themes are mapped to `"UnknownTheme"` (to avoid `NaN` propagation and
sorting issues) and logged as a warning.
"""
tickers = _construct_all_tickers_list(tickers=tickers, cids=cids, xcats=xcats)
if not tickers or not any(t.strip() for t in tickers):
raise ValueError("No valid tickers to search for.")
catalog_file = catalog_file or self.download_catalog_file()
catalog_df = pd.read_parquet(catalog_file)
catalog_df.loc[:, "Dataset"] = catalog_df["Theme"].map(
JPMAQS_DATASET_THEME_MAPPING
)
catalog_df.loc[:, "Dataset"] = catalog_df["Dataset"].fillna("UnknownTheme")
if not set(catalog_df["Theme"]) == set(JPMAQS_DATASET_THEME_MAPPING.keys()):
missing_themes = set(catalog_df["Theme"]) - set(
JPMAQS_DATASET_THEME_MAPPING.keys()
)
logger.warning(
f"Catalog file contains unknown themes: {missing_themes}. "
"Please check for newer versions of the `macrosynergy` Python package."
)
if case_sensitive:
catalog_df = catalog_df[catalog_df["Ticker"].isin(tickers)]
else:
catalog_df = catalog_df[
catalog_df["Ticker"].str.lower().isin(t.lower() for t in tickers)
]
datasets_to_keep = sorted(set(catalog_df["Dataset"]))
return datasets_to_keep
[docs] def filter_to_valid_tickers(
self,
tickers: List[str],
case_sensitive: bool = False,
catalog_file: Optional[str] = None,
) -> List[str]:
"""
Filters a list of tickers to only those that are valid according to the catalog.
Parameters
----------
tickers : List[str]
A list of tickers to validate.
case_sensitive : bool
If True, performs case-sensitive matching. Default is False.
"""
if not isinstance(tickers, list) or not all(
isinstance(x, str) for x in tickers
):
raise ValueError("`tickers` must be a list of strings.")
catalog_file = catalog_file or self.download_catalog_file()
catalog_tickers: List[str] = list(
set(pd.read_parquet(catalog_file)["Ticker"].dropna().astype(str))
)
if case_sensitive:
valid = set(catalog_tickers) & set(tickers)
return sorted(valid)
# case-insensitive - build canonical ticker name lookup
canonical_map = {t.lower(): t for t in catalog_tickers}
valid = {
canonical_map[t.strip().lower()]
for t in tickers
if t.strip() and t.strip().lower() in canonical_map
}
return sorted(valid)
[docs] def list_downloaded_files(
self,
include_last_modified_columns: bool = False,
) -> pd.DataFrame:
col_order = [
"filename",
"file-datetime",
"dataset",
"filetype",
"file-timestamp",
"path",
]
dfs = [
_downloaded_files_df(
self.out_dir, file_format=fmt, include_metadata_files=True
)
for fmt in ["parquet", "json"]
]
dfs = [df for df in dfs if not df.empty]
if not dfs:
return pd.DataFrame(columns=col_order)
files_df = pd.concat(dfs).reset_index(drop=True)
if files_df.empty:
return files_df
files_df = files_df[col_order].rename(columns={"filename": "file-name"})
if include_last_modified_columns:
dq_files_df = self.list_available_files_for_all_file_groups()
dq_files_df = dq_files_df[
dq_files_df["file-name"].isin(files_df["file-name"])
]
files_df = files_df.merge(
dq_files_df[["file-name", "last-modified"]],
on="file-name",
)
return files_df
def _load_metadata_jsons(
self,
date: Optional[Union[pd.Timestamp, str]] = None,
normalize_headers: bool = True,
skip_download: bool = False,
) -> Dict[str, pd.DataFrame]:
"""Load JPMaQS metadata notification JSONs for a date."""
date = (
pd_to_datetime_compat(date) if date is not None else pd.Timestamp.utcnow()
).normalize()
if date > pd.Timestamp.utcnow().normalize():
today_utc = pd.Timestamp.utcnow().normalize()
raise ValueError(
"Provided `date` is in the future (UTC). "
f"Requested: {date.date()}, today (UTC): {today_utc.date()}."
)
if not skip_download:
to_dt = date + pd.offsets.BDay(1) - pd.Timedelta(seconds=1)
self.download_full_snapshot(
since_datetime=date,
to_datetime=to_dt,
include_full_snapshots=False,
include_delta=False,
include_metadata=True,
)
df = self.list_downloaded_files()
df: pd.DataFrame = df[
(df["dataset"] == "JPMAQS_METADATA_NOTIFICATIONS")
& df["file-name"].str.lower().str.endswith(".json")
]
date = date.normalize()
df = df[df["file-timestamp"].dt.normalize() == date]
if df.empty:
logger.warning(f"No notification files found for date: {date.date()}")
return {}
json_contents: Dict[str, pd.DataFrame] = {}
err_str = 'Invalid notification file (missing "sub_title"): '
title_err_str = "Unexpected notification title in file: "
expected_titles = [
"Missing Updates",
"Changed historical values",
"Additional information on missing updates",
]
canonical_title_map = {t.upper(): t for t in expected_titles}
for jp in df["path"].apply(str).tolist():
_json = {}
with open(jp, "r", encoding="utf-8") as f:
_json: Dict[str, dict] = json.load(f)
if _json.get("metadata", {}).get("sub_title", None) is None:
logger.warning(err_str + jp)
continue
j_title: str = _json["metadata"]["sub_title"]
if j_title.upper() not in map(str.upper, expected_titles):
logger.warning(title_err_str + jp)
continue
canonical_title = canonical_title_map[j_title.upper()]
json_contents[canonical_title] = pd.json_normalize(
_json, record_path=["data"]
)
if normalize_headers:
for key in json_contents:
new_cols = [
_col.replace(" ", "_")
.replace("-", "_")
.replace("(%)", "pct")
.lower()
for _col in json_contents[key].columns
]
json_contents[key].columns = new_cols
return json_contents
[docs] def get_revisions_notifications(
self,
date: Optional[Union[pd.Timestamp, str]] = None,
normalize_headers: bool = True,
) -> pd.DataFrame:
"""
Return "Changed historical values" notifications for a given date.
This loads daily JPMaQS metadata notification JSON(s) for the requested date
and returns the table describing historical revisions. If no matching
notification file(s) are found, an empty DataFrame is returned.
Parameters
----------
date : Optional[Union[pd.Timestamp, str]]
Target date (UTC). Strings can be "YYYY-MM-DD", "YYYYMMDD", or ISO 8601.
Defaults to today (UTC).
normalize_headers : bool
If True, normalizes column names to lowercase snake_case and converts
"(%)" to "pct". Defaults to True.
Returns
-------
pd.DataFrame
A DataFrame of revision notifications. Empty if none are found.
"""
jsons = self._load_metadata_jsons(
date=date, normalize_headers=normalize_headers
)
if "Changed historical values" not in jsons:
logger.warning("No `Changed historical values` notifications found.")
return pd.DataFrame()
return jsons["Changed historical values"]
[docs] def get_missing_data_notifications(
self,
date: Optional[Union[pd.Timestamp, str]] = None,
normalize_headers: bool = True,
) -> pd.DataFrame:
"""
Return missing-update notifications (with optional additional information).
This loads daily JPMaQS metadata notification JSON(s) for the requested date.
It returns:
- "Missing Updates" rows
- left-joined with "Additional information on missing updates" when available
If only one of the two tables is available, that table is returned. If
neither is available, an empty DataFrame is returned.
Parameters
----------
date : Optional[Union[pd.Timestamp, str]]
Target date (UTC). Strings can be "YYYY-MM-DD", "YYYYMMDD", or ISO 8601.
Defaults to today (UTC).
normalize_headers : bool
If True, normalizes column names to lowercase snake_case and converts
"(%)" to "pct". Defaults to True.
Returns
-------
pd.DataFrame
A DataFrame of missing-update notifications (optionally enriched).
"""
jsons = self._load_metadata_jsons(
date=date, normalize_headers=normalize_headers
)
df1 = jsons.get("Missing Updates", pd.DataFrame())
df2 = jsons.get("Additional information on missing updates", pd.DataFrame())
if df1.empty and df2.empty:
logger.warning("No `Missing Updates` or related notifications found.")
return pd.DataFrame()
if df2.empty:
logger.warning(
"No `Additional information on missing updates` notifications found."
)
return df1
if df1.empty:
logger.warning("No `Missing Updates` notifications found.")
return df2
left_join_key = None
if "Ticker" in df1.columns and "ticker" in df2.columns:
df1 = df1.rename(columns={"Ticker": "ticker"})
elif "ticker" in df1.columns and "Ticker" in df2.columns:
df2 = df2.rename(columns={"Ticker": "ticker"})
for candidate in ("Ticker", "ticker"):
if candidate in df1.columns and candidate in df2.columns:
left_join_key = candidate
break
if left_join_key is None:
raise KeyError(
'Expected a common join key ("Ticker" or "ticker") in notification data.'
)
df1 = (
df1.merge(df2, how="left", on=left_join_key)
.sort_values(by=left_join_key, ascending=True)
.reset_index(drop=True)
)
return df1
[docs] def download_full_snapshot(
self,
since_datetime: Optional[str] = None,
to_datetime: Optional[str] = None,
overwrite: bool = False,
chunk_size: Optional[int] = None,
timeout: Optional[float] = DQ_FILE_API_TIMEOUT,
include_full_snapshots: bool = True,
include_delta: bool = True,
include_metadata: bool = True,
file_group_ids: Optional[List[str]] = None,
show_progress: bool = True,
delete_corrupt_files: bool = False,
_selection_since_datetime: Optional[str] = None,
_selection_to_datetime: Optional[str] = None,
_selection_min_last_updated: Optional[Union[str, pd.Timestamp]] = None,
_selection_max_last_updated: Optional[Union[str, pd.Timestamp]] = None,
) -> None:
"""
Downloads a complete snapshot of files based on specified criteria.
This method fetches the full available-file inventory from the API (unbounded
by `since_datetime` / `to_datetime`) and delegates vintage-aware selection to
`FileSelector`. The vintage window (`since_datetime`, `to_datetime`) controls
which files are selected for download, not which files are listed from the API.
Parameters
----------
since_datetime : Optional[str]
Vintage window start (inclusive) used for file selection.
Defaults to the start of the current day (UTC).
to_datetime : Optional[str]
Vintage window end (inclusive) used for file selection.
Note: loading uses all locally available cached snapshot/delta files.
overwrite : bool
If True, overwrites files if they already exist. Default is False.
chunk_size : Optional[int]
The chunk size for streaming downloads (in bytes).
timeout : Optional[float]
The timeout for each download request in seconds.
include_full_snapshots : bool
If True, download full snapshot files.
include_delta : bool
If True, download delta files.
include_metadata : bool
If True, download metadata files.
file_group_ids : Optional[List[str]]
A specific list of file groups to download from. If provided, only files
from these groups will be downloaded.
show_progress : bool
If True, displays a progress bar for downloads.
Notes
-----
Internal parameters `_selection_since_datetime`, `_selection_to_datetime`,
`_selection_min_last_updated`, and `_selection_max_last_updated` are
used by higher-level helpers (such as `download()`) to pass selection intent that
differs from the raw `since_datetime`/`to_datetime` window (for example, when row-
vintage cutoffs via `max_last_updated` affect delta coverage decisions).
"""
Path(self.out_dir).mkdir(parents=True, exist_ok=True)
start_time = time.time()
if since_datetime is None:
# JPMaQS data files are not published on weekends, so "today" can yield
# no snapshot/delta files even though the catalog is daily.
if include_full_snapshots or include_delta:
since_datetime = get_current_or_last_business_day().strftime("%Y%m%d")
else:
since_datetime = pd.Timestamp.utcnow().strftime("%Y%m%d")
logger.info(
f"Starting snapshot download to '{self.out_dir}' for files since {since_datetime}."
)
validate_dq_timestamp(since_datetime, var_name="since_datetime")
if to_datetime is not None:
validate_dq_timestamp(to_datetime, var_name="to_datetime")
since_dt = pd_to_datetime_compat(since_datetime)
to_dt = pd_to_datetime_compat(to_datetime)
if to_dt < since_dt:
new_since = (to_dt - pd.offsets.BDay(1)).strftime("%Y%m%d")
logger.warning(
"`to_datetime` is before `since_datetime`; adjusting "
"`since_datetime` to be one business day before `to_datetime`. "
f"New `since_datetime`: {new_since}"
)
since_datetime = new_since
validate_dq_timestamp(since_datetime, var_name="since_datetime")
if file_group_ids is not None:
if not isinstance(file_group_ids, list) or not all(
isinstance(x, str) for x in file_group_ids
):
raise ValueError("`file_group_ids` must be a list of strings.")
# Always refresh the selector with an unfiltered API inventory so it can make
# consistent vintage decisions from the full history.
selector = self.file_selector
api_files_df = self.list_available_files_for_all_file_groups()
downloaded_files_df = self.list_downloaded_files(
include_last_modified_columns=False
)
selection_since = (
_selection_since_datetime
if _selection_since_datetime is not None
else since_datetime
)
selection_to = (
_selection_to_datetime
if _selection_to_datetime is not None
else to_datetime
)
selector.refresh(api_files_df=api_files_df, local_files_df=downloaded_files_df)
oldest_ts = selector.oldest_api_file_timestamp()
if (selection_to is not None) and (oldest_ts is not None):
to_cutoff = _normalize_file_timestamp_cutoff(selection_to)
if to_cutoff < oldest_ts:
raise ValueError(
"`to_datetime` predates the oldest available JPMaQS file "
"timestamp reported by the API "
f"({oldest_ts.strftime('%Y-%m-%dT%H:%M:%SZ')})."
)
to_download = selector.select_files_for_download(
overwrite=overwrite,
since_datetime=selection_since,
to_datetime=selection_to,
file_group_ids=file_group_ids,
include_full_snapshots=include_full_snapshots,
include_delta_files=include_delta,
include_metadata_files=include_metadata,
warn_if_no_full_snapshots=bool(selection_since),
min_last_updated=_selection_min_last_updated,
max_last_updated=_selection_max_last_updated,
)
to_download = list(set(to_download))
num_files_to_download = len(to_download)
logger.info(f"Found {num_files_to_download} new files to download.")
if not num_files_to_download:
logger.info("No new files to download.")
return
selected_df = api_files_df[api_files_df["file-name"].isin(to_download)].copy()
selected_df["download-priority"] = (
selected_df["file-name"]
.astype(str)
.str.lower()
.apply(lambda x: (3 if "_metadata" in x else (2 if "_delta" in x else 1)))
)
download_order = selected_df.sort_values(
by=["download-priority", "file-datetime", "file-name"],
)["file-name"].tolist()
self.download_multiple_files(
filenames=download_order,
overwrite=overwrite,
chunk_size=chunk_size,
timeout=timeout,
show_progress=show_progress,
delete_corrupt_files=delete_corrupt_files,
)
self._refresh_file_selector()
total_time = time.time() - start_time
logger.info(f"Snapshot download completed in {total_time:.2f} seconds.")
[docs] def download_delta_files(
self,
since_datetime: Optional[str] = None,
to_datetime: Optional[str] = None,
overwrite: bool = False,
chunk_size: Optional[int] = None,
timeout: Optional[float] = DQ_FILE_API_TIMEOUT,
include_delta: bool = True,
include_metadata: bool = True,
file_group_ids: Optional[List[str]] = None,
show_progress: bool = True,
delete_corrupt_files: bool = False,
):
"""
A convenience function to allow downloading only delta files within a given window.
This is a wrapper around `download_full_snapshot()` with `include_full_snapshots=False`.
"""
return self.download_full_snapshot(
since_datetime=since_datetime,
to_datetime=to_datetime,
overwrite=overwrite,
chunk_size=chunk_size,
timeout=timeout,
include_full_snapshots=False,
include_delta=include_delta,
include_metadata=include_metadata,
file_group_ids=file_group_ids,
show_progress=show_progress,
delete_corrupt_files=delete_corrupt_files,
)
def _validate_and_resolve_tickers(
self,
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
catalog_file: Optional[str] = None,
include_delta_files: bool = True,
datasets: Optional[List[str]] = None,
) -> tuple:
"""
Construct, validate, and resolve tickers against the catalog.
Returns
-------
(valid_tickers, resolved_datasets)
`valid_tickers` is the subset of the requested tickers that exist in the
catalog. `resolved_datasets` is either the caller-supplied `datasets` (when
not None) or the datasets inferred from the catalog, with `_DELTA` suffixes
appended when `include_delta_files` is True.
"""
rqstd_tickers = _construct_all_tickers_list(
tickers=tickers, cids=cids, xcats=xcats
)
if not bool(rqstd_tickers):
raise ValueError(
"At least one ticker must be specified via `tickers`, or `cids` & `xcats`."
)
valid_tickers = self.filter_to_valid_tickers(
tickers=rqstd_tickers, catalog_file=catalog_file
)
if not valid_tickers:
raise ValueError(
"No valid tickers found with the provided `tickers`, `cids`, and `xcats`."
)
valid_norm = {t.lower() for t in valid_tickers}
missing = sorted({t for t in rqstd_tickers if t.lower() not in valid_norm})
if missing:
lmiss = min(5, len(missing))
nmore = f"{len(missing) - lmiss} more" if len(missing) > lmiss else ""
miss_str = "[" + ", ".join(missing[:lmiss]) + "..." + nmore + "]"
miss_str = (
f"{len(missing)} tickers requested do not exist in the catalog, "
f"these are: {miss_str}"
)
logger.warning(miss_str)
resolved_datasets = datasets
if resolved_datasets is None:
resolved_datasets = self.get_datasets_for_indicators(
tickers=valid_tickers, catalog_file=catalog_file
)
if resolved_datasets and include_delta_files:
resolved_datasets += [f"{ds}_DELTA" for ds in resolved_datasets]
return valid_tickers, resolved_datasets
[docs] def load_data(
self,
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
metrics: Optional[List[str]] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
min_last_updated: Optional[Union[str, pd.Timestamp]] = None,
max_last_updated: Optional[Union[str, pd.Timestamp]] = None,
include_file_column: bool = False,
dataframe_format: str = "qdf",
dataframe_type: str = "pandas",
categorical_dataframe: bool = True,
include_delta_files: bool = True,
delta_treatment: str = "latest",
since_datetime: Optional[str] = None,
to_datetime: Optional[str] = None,
catalog_file: Optional[str] = None,
datasets: Optional[List[str]] = None,
) -> Union[pd.DataFrame, pl.DataFrame, pl.LazyFrame]:
"""
Load JPMaQS timeseries from the local cache for the requested selection.
This method performs the "load" part of `download()`: it resolves tickers to the
underlying JPMaQS datasets (using the catalog file) and returns the filtered data
from locally cached snapshot/delta parquet files.
Unlike `download()`, this method does **not** download snapshot/delta/metadata
files. It assumes the relevant files are already present in `out_dir`.
The catalog file is still downloaded/validated unless `catalog_file` is provided.
Parameters
----------
tickers : Optional[List[str]]
A list of tickers to filter datasets. Each ticker must be in the standard
format "CID_XCAT" used in JPMaQS.
cids : Optional[List[str]]
A list of cross-sectional identifiers (CIDs) to filter datasets.
xcats : Optional[List[str]]
A list of extended categories (XCATS) to filter datasets.
metrics : Optional[List[str]]
A list of JPMaQS metrics to filter the data. Available metrics are "value",
"grading", "eop_lag", "mop_lag", and "last_updated". The available metrics
are also defined in `macrosynergy.management.constants.JPMAQS_METRICS`. The default
is None, in which case all metrics are returned.
start_date : Optional[str]
The start date for the returned data in "YYYY-MM-DD" (or "YYYYMMDD") format.
If None, data is returned from the earliest available date.
end_date : Optional[str]
The end date for the returned data in "YYYY-MM-DD" (or "YYYYMMDD") format.
If None, data is returned up to the latest available date.
min_last_updated : Optional[Union[str, pd.Timestamp]]
If provided, only data points with `last_updated` on or after this timestamp
are returned. Strings can be "YYYY-MM-DDThh:mm:ss", "YYYYMMDDhhmmss", or
ISO 8601 format.
max_last_updated : Optional[Union[str, pd.Timestamp]]
If provided, only data points with `last_updated` on or before this timestamp
are returned. Strings can be "YYYY-MM-DDThh:mm:ss", "YYYYMMDDhhmmss", or
ISO 8601 format.
include_file_column : bool
If True, includes a column indicating the source file for each data point.
Default is False.
dataframe_format : str
The output schema. Options are:
- "qdf": quantamental schema with `cid` and `xcat` columns.
- "tickers": ticker schema with a single `ticker` column (instead of `cid`/`xcat`).
Note: if you want a wide matrix (date x ticker), pivot the returned data
using pandas/Polars. Default is "qdf".
dataframe_type : str
The type of DataFrame to return. Options are "pandas" for a pandas DataFrame,
"polars" for a polars DataFrame, or "polars-lazy" for a polars LazyFrame.
Default is "pandas".
categorical_dataframe : bool
If True and `dataframe_type` is "pandas" (or "polars"/"polars-lazy" with
compatible Polars versions), converts selected string columns to categorical
dtype. Default is True.
include_delta_files : bool
If True, includes delta files in the load process (recommended).
Default is True.
delta_treatment : str
Determines how to treat duplicate values between snapshots and deltas. Options are:
- "latest": keep the latest value per series/date.
- "earliest": keep the earliest value per series/date.
- "all": keep all entries.
Default is "latest".
since_datetime : Optional[str]
Restrict which locally available snapshot/delta files are considered to those
with file timestamps on/after this cutoff (inclusive). This uses the file
timestamp embedded in the filename (`file-datetime`), not the HTTP metadata
field `last-modified`. If None, all locally available files are considered.
to_datetime : Optional[str]
Restrict which locally available snapshot/delta files are considered to those
with file timestamps on/before this cutoff (inclusive). This uses the file
timestamp embedded in the filename (`file-datetime`). If None, all locally
available files are considered.
Notes:
- If `to_datetime` is provided and `max_last_updated` is not, the loader
defaults `max_last_updated` to `to_datetime` (interpreting date-only strings
as end-of-day). This is important for monthly delta regimes where the
covering delta file can be timestamped at month-end (after `to_datetime`),
and row-level filtering by `last_updated` is needed to honor the requested
data.
catalog_file : Optional[str]
Optional path to a local JPMaQS catalog parquet file. If not provided, the
client will download/validate the latest catalog file for ticker resolution.
datasets : Optional[List[str]]
Optional list of JPMaQS datasets (file-group IDs) to restrict which locally cached
snapshot/delta parquet files are scanned/loaded. If not provided, datasets are
inferred from the requested tickers using the catalog.
Returns
-------
Union[pd.DataFrame, pl.DataFrame, pl.LazyFrame]
A DataFrame containing the requested data.
"""
catalog_file = catalog_file or self.download_catalog_file()
rqstd_tickers, datasets_to_download = self._validate_and_resolve_tickers(
tickers=tickers,
cids=cids,
xcats=xcats,
catalog_file=catalog_file,
include_delta_files=include_delta_files,
datasets=datasets,
)
if to_datetime is not None:
if isinstance(to_datetime, str):
validate_dq_timestamp(to_datetime, var_name="to_datetime")
local_files_df = self.list_downloaded_files(
include_last_modified_columns=False
)
if (not local_files_df.empty) and (
"file-timestamp" in local_files_df.columns
):
is_parquet = (
local_files_df["file-name"]
.astype(str)
.str.lower()
.str.endswith(".parquet")
)
is_metadata = (
local_files_df["file-name"]
.astype(str)
.str.contains("_METADATA", case=False, na=False)
)
is_catalog = (
local_files_df["dataset"].astype(str).eq(self.catalog_file_group_id)
)
data_df = local_files_df.loc[
is_parquet & (~is_metadata) & (~is_catalog)
]
oldest_local_ts = (
data_df["file-timestamp"].min()
if (not data_df.empty) and ("file-timestamp" in data_df.columns)
else None
)
if pd.isna(oldest_local_ts):
oldest_local_ts = None
if oldest_local_ts is not None:
to_cutoff = _normalize_file_timestamp_cutoff(to_datetime)
if to_cutoff < oldest_local_ts:
raise ValueError(
"`to_datetime` predates the oldest JPMaQS data file timestamp "
"found in the local cache "
f"({oldest_local_ts.strftime('%Y-%m-%dT%H:%M:%SZ')})."
)
effective_max_last_updated = max_last_updated
if to_datetime is not None and max_last_updated is None:
effective_max_last_updated = _normalize_last_updated_cutoff(to_datetime)
warn_if_no_full_snapshots = since_datetime is not None
return lazy_load_from_parquets(
files_dir=self.out_dir,
tickers=rqstd_tickers,
metrics=metrics,
start_date=start_date,
end_date=end_date,
min_last_updated=min_last_updated,
max_last_updated=effective_max_last_updated,
include_delta_files=include_delta_files,
delta_treatment=delta_treatment,
dataframe_format=dataframe_format,
dataframe_type=dataframe_type,
categorical_dataframe=categorical_dataframe,
datasets=datasets_to_download,
include_file_column=include_file_column,
catalog_file=catalog_file,
warn_if_no_full_snapshots=warn_if_no_full_snapshots,
since_datetime=since_datetime,
to_datetime=to_datetime,
)
def _get_effective_snapshot_switchover_ts(
self, datasets: List[str]
) -> Optional[pd.Timestamp]:
"""
Return the effective (per-request) earliest full-snapshot timestamp.
Notes
-----
JPMaQS can remove older full snapshots over time. For a given set of datasets
we define the "switchover" as the *latest* of the datasets' earliest currently
available full snapshots. If any dataset has no full snapshots at all, returns
None.
"""
if not datasets:
return None
# Datasets are passed around as file-group-ids; treat *_DELTA as updates to the
# base dataset. Exclude metadata-only datasets.
base_datasets = sorted(
{
str(d).replace("_DELTA", "")
for d in datasets
if isinstance(d, str) and d and ("_METADATA" not in d.upper())
}
)
base_datasets = [d for d in base_datasets if d != self.catalog_file_group_id]
if not base_datasets:
return None
selector = self.file_selector
selector.refresh(api_files_df=self.list_available_files_for_all_file_groups())
return selector.effective_snapshot_switchover_ts(
file_group_ids=base_datasets,
catalog_file_group_id=self.catalog_file_group_id,
)
[docs] def download(
self,
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
metrics: Optional[List[str]] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
min_last_updated: Optional[Union[str, pd.Timestamp]] = None,
max_last_updated: Optional[Union[str, pd.Timestamp]] = None,
include_file_column: bool = False,
dataframe_format: str = "qdf",
dataframe_type: str = "pandas",
categorical_dataframe: bool = True,
include_delta_files: bool = True,
include_metadata_files: bool = True,
delta_treatment: str = "latest",
show_progress: bool = True,
overwrite: bool = False,
since_datetime: Optional[str] = None,
to_datetime: Optional[str] = None,
skip_download: bool = False,
cleanup_old_files_n_days: Optional[int] = None,
delete_corrupt_files: bool = False,
) -> Union[pd.DataFrame, pl.DataFrame, pl.LazyFrame]:
"""
Download JPMaQS files into the local cache and load the requested timeseries.
This is the main "one-stop" method: it resolves tickers to the underlying JPMaQS
datasets, downloads the necessary snapshot/delta/metadata files into the local
cache (unless `skip_download=True`), and returns the filtered data.
For a "load-only" workflow (no snapshot/delta downloads), call `load_data()`
directly (or call this method with `skip_download=True`).
Parameters
----------
tickers : Optional[List[str]]
A list of tickers to filter datasets. Each ticker must be in the standard
format "CID_XCAT" used in JPMaQS.
cids : Optional[List[str]]
A list of cross-sectional identifiers (CIDs) to filter datasets.
xcats : Optional[List[str]]
A list of extended categories (XCATS) to filter datasets.
metrics : Optional[List[str]]
A list of JPMaQS metrics to filter the data. Available metrics are "value",
"grading", "eop_lag", "mop_lag", and "last_updated". The available metrics
are also defined in `macrosynergy.management.constants.JPMAQS_METRICS`. The default
is None, in which case all metrics are returned.
start_date : Optional[str]
The start date for the returned data in "YYYY-MM-DD" (or "YYYYMMDD") format.
If None, data is returned from the earliest available date.
end_date : Optional[str]
The end date for the returned data in "YYYY-MM-DD" (or "YYYYMMDD") format.
If None, data is returned up to the latest available date.
min_last_updated : Optional[Union[str, pd.Timestamp]]
If provided, only data points with `last_updated` on or after this timestamp
are returned. Strings can be "YYYY-MM-DDThh:mm:ss", "YYYYMMDDhhmmss", or
ISO 8601 format.
max_last_updated : Optional[Union[str, pd.Timestamp]]
If provided, only data points with `last_updated` on or before this timestamp
are returned. Strings can be "YYYY-MM-DDThh:mm:ss", "YYYYMMDDhhmmss", or
ISO 8601 format.
include_file_column : bool
If True, includes a column indicating the source file for each data point.
Default is False.
dataframe_format : str
The output schema. Options are:
- "qdf": quantamental schema with `cid` and `xcat` columns.
- "tickers": ticker schema with a single `ticker` column (instead of `cid`/`xcat`).
Note: if you want a wide matrix (date x ticker), pivot the returned data
using pandas/Polars. Default is "qdf".
dataframe_type : str
The type of DataFrame to return. Options are "pandas" for a pandas DataFrame,
"polars" for a polars DataFrame, or "polars-lazy" for a polars LazyFrame.
Default is "pandas".
categorical_dataframe : bool
If True and `dataframe_type` is "pandas", the returned DataFrame will use
categorical dtypes for object columns. Default is True.
include_delta_files : bool
If True, delta files will be downloaded (and applied when loading, via
`delta_treatment`). Default is True.
include_metadata_files : bool
If True, metadata files will be included in the download. Default is True.
delta_treatment : str
Specifies how to treat new or updated entries across files from different
dates (based on `last_updated`). Options are:
- "latest": keep the latest value per series/date (default).
- "earliest": keep the earliest value per series/date.
- "all": keep all entries.
show_progress : bool
If True, displays a progress bar during downloads. Default is True.
overwrite : bool
If True, overwrites files if they already exist. Default is False.
since_datetime : Optional[str]
File-vintage window start (inclusive) for selecting which snapshot/delta/metadata
files to download. This is based on the file timestamp (`file-datetime`) rather
than the HTTP metadata field `last-modified`. Defaults to the start of the
current day (UTC).
to_datetime : Optional[str]
File-vintage window end (inclusive) for selecting which snapshot/delta/metadata
files to download. Note: `since_datetime` and `to_datetime` only affect which
files are downloaded; the returned timeseries is controlled by
`start_date`/`end_date`.
Note for historical ("delta-only") vintages:
- If `to_datetime` falls within a month where only monthly delta files exist,
the selector may still include the covering month-end delta file even if its
file timestamp is after `to_datetime`. Row-level filtering is then enforced
via `max_last_updated` during the load step.
skip_download : bool
If True, do not download snapshot/delta/metadata files and only load from the
local cache. In this mode, the client will use the most recent *local* catalog
parquet file (at or before `to_datetime` if provided) and will not make any
network requests. If no local catalog is available, this method raises a
ValueError. Default is False.
cleanup_old_files_n_days : Optional[int]
If set to an integer value, deletes files older than this number of days
from the local cache after the download is complete. This integer value is
passed to `cleanup_old_files()`. If None, no cleanup is performed.
Default is None.
Returns
-------
Union[pd.DataFrame, pl.DataFrame, pl.LazyFrame]
A DataFrame containing the requested data.
"""
def _empty_for_type(df_type: str):
if df_type == "pandas":
return pd.DataFrame()
if df_type == "polars":
return pl.DataFrame()
if df_type == "polars-lazy":
return pl.DataFrame().lazy()
return pd.DataFrame()
fs = None
if skip_download:
existing = self.list_downloaded_files(include_last_modified_columns=False)
fs = FileSelector(
api_files_df=None,
local_files_df=existing,
file_name_col="file-name",
)
else:
fs = self.file_selector
most_recent_local_catalog_path = fs._most_recent_local_catalog(
to_datetime=to_datetime
)
if skip_download and most_recent_local_catalog_path is None:
raise ValueError(
"Cannot skip download when no local catalog file is available. "
"Please run this method once with `skip_download=False` to populate the cache."
)
catalog_file = None
if skip_download:
catalog_file = str(most_recent_local_catalog_path)
else:
try:
catalog_file = self.download_catalog_file()
except DownloadError as e:
# If the API has no catalog (or catalog download fails), fall back to a
# local cache if possible.
catalog_file = most_recent_local_catalog_path
if catalog_file:
logger.warning(
"Failed to download the JPMaQS catalog file; falling back to the "
f"most recent local catalog at '{catalog_file}'. Error: {e}"
)
else:
logger.warning(
f"Failed to download the JPMaQS catalog file and no local catalog exists. "
f"Returning an empty DataFrame. Error: {e}"
)
return _empty_for_type(dataframe_type)
rqstd_tickers, datasets_to_download = self._validate_and_resolve_tickers(
tickers=tickers,
cids=cids,
xcats=xcats,
catalog_file=catalog_file,
include_delta_files=include_delta_files,
)
if "UnknownTheme" in datasets_to_download:
logger.warning(
"Some tickers map to unknown catalog themes. "
"These will be ignored for download selection until "
"`JPMAQS_DATASET_THEME_MAPPING` is updated."
)
datasets_to_download = [
d for d in datasets_to_download if d != "UnknownTheme"
]
if not skip_download:
download_since_datetime = since_datetime
if download_since_datetime is None:
if to_datetime is not None:
to_dt = pd_to_datetime_compat(to_datetime)
download_since_datetime = (to_dt - pd.offsets.BDay(1)).strftime(
"%Y%m%d"
)
else:
download_since_datetime = (
get_current_or_last_business_day().strftime("%Y%m%d")
)
if (to_datetime is not None) and (not include_delta_files):
validate_dq_timestamp(to_datetime, var_name="to_datetime")
to_ts = _normalize_file_timestamp_cutoff(to_datetime)
switchover_ts = self._get_effective_snapshot_switchover_ts(
datasets=datasets_to_download
)
if switchover_ts is None or to_ts < switchover_ts:
raise ValueError(
"The requested vintage predates the earliest available full "
"snapshots, so `include_delta_files` must be True."
)
self.download_full_snapshot(
since_datetime=download_since_datetime,
to_datetime=to_datetime,
file_group_ids=datasets_to_download,
overwrite=overwrite,
show_progress=show_progress,
include_full_snapshots=True,
include_delta=include_delta_files,
include_metadata=include_metadata_files,
delete_corrupt_files=delete_corrupt_files,
_selection_min_last_updated=min_last_updated,
_selection_max_last_updated=max_last_updated,
)
if not isinstance(cleanup_old_files_n_days, (type(None), int)):
raise ValueError(
"`cleanup_old_files_n_days` must be an integer or None."
)
if isinstance(cleanup_old_files_n_days, int):
cleanup_old_files_n_days = abs(cleanup_old_files_n_days)
self.cleanup_old_files(days_to_keep=cleanup_old_files_n_days)
if skip_download and isinstance(cleanup_old_files_n_days, int):
if cleanup_old_files_n_days > 0:
logger.warning(
"`cleanup_old_files_n_days` is ignored when `skip_download=True`."
)
load_since_datetime = since_datetime
return self.load_data(
tickers=rqstd_tickers,
metrics=metrics,
start_date=start_date,
end_date=end_date,
min_last_updated=min_last_updated,
max_last_updated=max_last_updated,
include_file_column=include_file_column,
dataframe_format=dataframe_format,
dataframe_type=dataframe_type,
categorical_dataframe=categorical_dataframe,
include_delta_files=include_delta_files,
delta_treatment=delta_treatment,
since_datetime=load_since_datetime,
to_datetime=to_datetime,
catalog_file=catalog_file,
datasets=datasets_to_download,
)
[docs] def download_as_of(
self,
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
metrics: Optional[List[str]] = None,
as_of_datetime: str = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
include_file_column: bool = False,
dataframe_format: str = "qdf",
dataframe_type: str = "pandas",
categorical_dataframe: bool = True,
include_delta_files: bool = True,
delta_treatment: str = "latest",
show_progress: bool = True,
overwrite: bool = False,
skip_download: bool = False,
cleanup_old_files_n_days: Optional[int] = None,
**kwargs,
) -> Union[pd.DataFrame, pl.DataFrame, pl.LazyFrame]:
"""
Return data "as of" a specific point in time.
This is a very lightweight wrapper around `download()`: it only translates the
intent ("as of") into the correct `download()` arguments:
- `to_datetime`: the file-vintage cutoff (which files can be used)
- `max_last_updated`: the row-level vintage cutoff (which updates are allowed)
See func:`macrosynergy.download.dataquery_file_api.dataquery_file_api.DataQueryFileAPIClient.download()`
for details on the other parameters and return value.
Parameters
----------
as_of_datetime : str
The point in time to view the data as of. This can be a date-only string
(e.g. "2023-12-31" or "20231231") or a datetime string
(e.g. "2023-12-31T15:30:00Z" or "20231231153000"). Date-only strings are
interpreted as end-of-day.
"""
if as_of_datetime is None:
raise ValueError("`as_of_datetime` must be provided.")
validate_dq_timestamp(as_of_datetime, var_name="as_of_datetime")
as_of_ts = pd_to_datetime_compat(as_of_datetime)
as_of_day = as_of_ts.normalize()
now_ts = pd_to_datetime_compat(pd.Timestamp.utcnow()).tz_convert("UTC")
today_utc = now_ts.normalize()
if _is_date_only_string(as_of_datetime):
if as_of_day > today_utc:
raise ValueError(
"`as_of_datetime` is in the future (UTC). "
f"Requested: {as_of_day.date()}, today (UTC): {today_utc.date()}."
)
to_datetime_str = as_of_day.strftime("%Y%m%d")
max_last_updated_str = (
as_of_day + pd.DateOffset(days=1) - pd.Timedelta(seconds=1)
).strftime("%Y-%m-%dT%H:%M:%SZ")
else:
if as_of_ts > now_ts:
raise ValueError(
"`as_of_datetime` is in the future (UTC). "
f"Requested: {as_of_ts.strftime('%Y-%m-%dT%H:%M:%SZ')}, "
f"now (UTC): {now_ts.strftime('%Y-%m-%dT%H:%M:%SZ')}."
)
to_datetime_str = as_of_ts.strftime("%Y-%m-%dT%H:%M:%SZ")
max_last_updated_str = to_datetime_str
selector = self.file_selector
if selector.api_files_df.empty:
selector.refresh(
api_files_df=self.list_available_files_for_all_file_groups()
)
oldest_ts = selector.oldest_api_file_timestamp()
if oldest_ts is not None:
to_cutoff = _normalize_file_timestamp_cutoff(to_datetime_str)
if to_cutoff < oldest_ts:
raise ValueError(
"`as_of_datetime` predates the oldest available JPMaQS file timestamp "
"reported by the API "
f"({oldest_ts.strftime('%Y-%m-%dT%H:%M:%SZ')})."
)
return self.download(
tickers=tickers,
cids=cids,
xcats=xcats,
metrics=metrics,
start_date=start_date,
end_date=end_date,
max_last_updated=max_last_updated_str,
include_file_column=include_file_column,
dataframe_format=dataframe_format,
dataframe_type=dataframe_type,
categorical_dataframe=categorical_dataframe,
include_delta_files=include_delta_files,
delta_treatment=delta_treatment,
show_progress=show_progress,
overwrite=overwrite,
since_datetime=None,
to_datetime=to_datetime_str,
skip_download=skip_download,
cleanup_old_files_n_days=cleanup_old_files_n_days,
include_metadata_files=False,
**kwargs,
)
def _construct_all_tickers_list(
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
) -> List[str]:
for param, name in zip(
[tickers, cids, xcats],
["tickers", "cids", "xcats"],
):
if param is not None:
if not isinstance(param, list) or not all(
isinstance(x, str) for x in param
):
raise ValueError(f"`{name}` must be a list of strings.")
if not any(bool(x) for x in [tickers, cids, xcats]):
raise ValueError("At least one of `tickers`, `cids`, or `xcats` must be set.")
if tickers is None:
tickers = []
if bool(cids) ^ bool(xcats):
raise ValueError("Either both `cids` and `xcats` must be set, or neither.")
if cids is None:
cids, xcats = [], []
tickers = sorted(set(tickers + [f"{c}_{x}" for c in cids for x in xcats]))
return tickers
[docs]def get_client_id_secret() -> Optional[Tuple[str, str]]:
"""Retrieve client ID and secret from environment variables."""
pairs = [
("DQ_CLIENT_ID", "DQ_CLIENT_SECRET"),
("DATAQUERY_CLIENT_ID", "DATAQUERY_CLIENT_SECRET"),
]
for client_id_env, client_secret_env in pairs:
client_id = os.getenv(client_id_env)
client_secret = os.getenv(client_secret_env)
if client_id and client_secret:
logger.info(
f"Using {client_id_env} and {client_secret_env} from environment"
)
return client_id, client_secret
return None, None
def _check_individual_file_parquet_columns(
file_path: Path,
) -> bool:
assert isinstance(file_path, Path)
base_name = file_path.name.upper()
if not base_name.startswith("JPMAQS_") or not base_name.endswith(".PARQUET"):
logger.warning(f"File {file_path} is not a recognized JPMAQS parquet file.")
return False
expected_cols = {}
if "_METADATA" in base_name:
expected_cols = JPMaQSParquetExpectedColumns.METADATA.value
else:
expected_cols = JPMaQSParquetExpectedColumns.TICKER.value
schema = {}
try:
lf = pl.scan_parquet(file_path)
if PYTHON_3_8_OR_LATER:
schema = lf.collect_schema()
else:
schema = lf.schema
schema = dict(schema)
if lf.head(1).collect().is_empty():
return False
except Exception as e:
logger.warning(f"Failed to read parquet file {file_path}: {e}")
return False
return schema == expected_cols
def _delete_corrupt_files(
files: List[Path],
extensions: List[str] = ["parquet", "json"],
root_dir: Path = None,
allow_empty: bool = False,
delete: bool = True,
) -> List[Path]:
"""Check files for corruption and optionally delete them.
Parameters
----------
delete : bool
If True, corrupt files are deleted from disk. If False, corrupt files
are only logged as warnings (no files are removed).
"""
corrupt_files = []
for file_path in map(Path, files):
if not file_path.exists():
continue
if file_path.suffix.lower() not in [
f".{ext.strip('.').lower()}" for ext in extensions
]:
continue
try:
if file_path.suffix.lower() == ".parquet":
if not _check_individual_file_parquet_columns(
file_path=file_path,
):
raise ValueError("File is corrupt or has invalid schema")
elif file_path.suffix.lower() == ".json":
with open(file_path, "r", encoding="utf-8") as f:
js = json.load(f)
if not allow_empty and not js:
raise ValueError("File is empty")
else:
continue
except KeyboardInterrupt:
raise
except Exception:
if delete:
logger.warning(f"Deleting corrupt file: {file_path}")
file_path.unlink()
else:
logger.warning(f"Corrupt file detected (not deleted): {file_path}")
corrupt_files.append(file_path)
if delete and root_dir is not None and root_dir.exists() and root_dir.is_dir():
for dirpath, _, _ in os.walk(root_dir, topdown=False):
dir_path = Path(dirpath)
if not any(dir_path.iterdir()):
try:
dir_path.rmdir()
logger.info(f"Removed empty directory: {dir_path}")
except Exception:
logger.warning(f"Failed to remove directory: {dir_path}")
return sorted(map(str, corrupt_files))
if __name__ == "__main__":
print("Current time UTC:", pd.Timestamp.utcnow().isoformat())
# start = time.time()
# since_datetime = pd.Timestamp.now() - pd.offsets.BDay(7)
# print(
# f"Downloading full-snapshots, delta-files, and metadata files published since {since_datetime}"
# )
# since_datetime = since_datetime.strftime("%Y%m%d")
# with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq:
# dq.download_catalog_file()
# dq.download_full_snapshot(since_datetime=since_datetime)
# print(dq.get_revisions_notifications().head())
# print(dq.get_missing_data_notifications().head())
# end = time.time()
# print(f"Download completed in {end - start:.2f} seconds")
test_cids = ["AUD", "BRL", "CAD", "CHF", "CNY", "CZK", "EUR", "GBP", "USD"]
test_xcats = ["RIR_NSA", "FXXR_NSA", "FXXR_VT10", "DU05YXR_NSA", "DU05YXR_VT10"]
tickers = [f"{c}_{x}" for c in test_cids for x in test_xcats]
# with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq:
# df = dq.download(
# tickers=tickers,
# include_file_column=True,
# )
# print(df.head())
with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq:
dfx = dq.list_available_files_for_all_file_groups(
include_delta=False, include_metadata=False
)
dq.download_delta_files(since_datetime="20220101")
# with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq:
df = dq.download_as_of(
tickers=tickers,
as_of_datetime="2026-01-13",
include_file_column=True,
)
df
# with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq:
# df = dq.download_as_of(tickers=tickers, as_of_datetime="2025-10-08T12:16:14Z")
# assert df["real_date"].max() <= pd.Timestamp("2025-11-12")
# assert df["last_updated"].max() <= pd.Timestamp("2025-10-08T12:16:14")
# print(df.head())
# with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq:
# pl_df: pl.DataFrame = dq.download(
# cids=test_cids,
# xcats=test_xcats,
# dataframe_format="tickers",
# dataframe_type="polars",
# )
# print(pl_df.head())