Source code for macrosynergy.download.dataquery_file_api

"""
Client for downloading JPMaQS data files from the JPMorgan DataQuery File API.

This module provides the `DataQueryFileAPIClient`, a high-level wrapper for the
JPMorgan DataQuery File API.


.. note::
    This functionality is currently in BETA and is subject to significant changes
    without deprecation cycles.

Consumption & Examples
----------------------

Before using the client, ensure your API credentials are set as environment variables:

.. code-block:: bash

    export DQ_CLIENT_ID="your_client_id"
    export DQ_CLIENT_SECRET="your_client_secret"

**Example 1: Initialize the client and list all available JPMaQS files.**

.. code-block:: python

    from macrosynergy.download import DataQueryFileAPIClient
    import pandas as pd

    client = DataQueryFileAPIClient()

    # Fetch a DataFrame of all available files for the JPMaQS group
    available_files_df = client.list_available_files_for_all_file_groups()
    print("Available JPMaQS files:")
    print(available_files_df.head())

**Example 2: Download all new or updated files for the current day.**

This is the recommended way to get a daily snapshot of all JPMaQS data,
including full datasets, deltas, and metadata.

.. code-block:: python

    from macrosynergy.download import DataQueryFileAPIClient
    client = DataQueryFileAPIClient(out_dir="./jpmaqs_data")

    print(f"Downloading today's files to {client.out_dir}...")
    client.download_full_snapshot()
    print("Download complete.")

**Example 3: Download all new or updated files for the day, and load data from them
as a dataframe.**

Here, the client checks locally available files, compares them to the latest files.
It automatically downloads new or updated files, and loads data for the specified `cids`, `xcats`,
`tickers`, and `start_date`/`end_date` as appropriate.
The resulting dataframe is returned to the user in the chosen dataframe format
(quantamental format/tickers format) and dataframe type (`pandas`/`polars`).


.. code-block:: python

    from macrosynergy.download import DataQueryFileAPIClient

    cids = ['AUD', 'CAD', 'USD', 'JPY']
    xcats = ['EQXR_NSA', 'RIR_NSA']
    start_date = '2000-01-01'

    with DataQueryFileAPIClient(out_dir="./jpmaqs_data") as client:
        df = client.download(cids=cids, xcats=xcats, start_date=start_date)
        print(df.head())


.. code-block:: python

       real_date  cid     xcat  value  eop_lag  mop_lag  grading        last_updated
    0 2000-01-03  AUD  RIR_NSA  4.078      0.0     55.0     1.25 2024-07-25 07:27:22
    1 2000-01-04  AUD  RIR_NSA  3.778      0.0     56.0     1.25 2024-07-25 07:27:22
    2 2000-01-05  AUD  RIR_NSA  3.747      0.0     56.0     1.25 2024-07-25 07:27:22
    3 2000-01-06  AUD  RIR_NSA  3.710      0.0     56.0     1.25 2024-07-25 07:27:22
    4 2000-01-07  AUD  RIR_NSA  3.697      0.0     57.0     1.25 2024-07-25 07:27:22


**Example 4: Download all new or updated delta-files since a specific date/time.**

.. code-block:: python

    from macrosynergy.download import DataQueryFileAPIClient
    import pandas as pd

    client = DataQueryFileAPIClient("./jpmaqs_data")
    since_datetime = pd.Timestamp.today() - pd.DateOffset(days=10)

    client.download_full_snapshot(
        since_datetime=since_datetime,
        include_full_snapshots=False,
        include_metadata=True,
        include_delta=True,
    )
    print("Download complete.")


**Example 5: Download a single, specific historical file.**

.. code-block:: python

    from macrosynergy.download import DataQueryFileAPIClient
    client = DataQueryFileAPIClient("./jpmaqs_data")
    # This specific filename can be found using the list_available_files... methods
    target_filename = "JPMAQS_MACROECONOMIC_BALANCE_SHEETS_20250414.parquet"

    print(f"Downloading {target_filename}...")
    file_path = client.download_file(filename=target_filename)
    print(f"File downloaded to: {file_path}")

**Example 6: Check availability for a specific file-group.**

.. code-block:: python

    from macrosynergy.download import DataQueryFileAPIClient
    client = DataQueryFileAPIClient()
    file_group_id = "JPMAQS_MACROECONOMIC_BALANCE_SHEETS"

    available_files = client.list_available_files(file_group_id=file_group_id)

    # print the earliest file's details
    print(available_files.iloc[-1])

**Example 7: Load "notification" metadata (missing updates & revisions).**

JPMaQS publishes daily metadata notification JSON files that summarize:

- Missing updates ("Missing Updates")
- Additional info about missing updates ("Additional information on missing updates")
- Changed historical values ("Changed historical values")

The helpers below download the relevant metadata for the requested date (UTC, business-day
window) if needed, and return the notifications as pandas DataFrames.

.. code-block:: python

    from macrosynergy.download import DataQueryFileAPIClient

    with DataQueryFileAPIClient(out_dir="./jpmaqs_data") as client:
        missing_df = client.get_missing_data_notifications(date="2026-01-19")
        revisions_df = client.get_revisions_notifications(date="2026-01-19")

        print(missing_df.head())
        print(revisions_df.head())
"""

import os
import pandas as pd
import polars as pl

import functools
import time
import warnings
from pathlib import Path
from enum import Enum

import concurrent.futures as cf
import logging
import shutil
import traceback as tb
import uuid
from typing import Dict, Any, Optional, List, Tuple, Union, Sequence
from tqdm import tqdm
import json

import requests
from macrosynergy.compat import PD_2_0_OR_LATER, PYTHON_3_8_OR_LATER
from macrosynergy.management.constants import JPMAQS_METRICS
from macrosynergy.download.dataquery import JPMAQS_GROUP_ID
from macrosynergy.download.fusion_interface import (
    request_wrapper,
    request_wrapper_stream_bytes_to_disk,
    _wait_for_api_call,
    convert_ticker_based_parquet_file_to_qdf,
    cache_decorator,
)
from macrosynergy.download.dataquery import OAUTH_TOKEN_URL
from macrosynergy.download.exceptions import DownloadError, InvalidResponseError
from macrosynergy.download.jpm_oauth import JPMorganOAuth

DQ_FILE_API_BASE_URL: str = (
    "https://api-dataquery.jpmchase.com/research/dataquery-authe/api/v2"
)
DQ_FILE_API_FALLBACK_BASE_URL: str = (
    "https://api-strm-gw01.jpmchase.com/research/dataquery-authe/api/v2"
)
DQ_FILE_API_SCOPE: str = "JPMC:URI:RS-06785-DataQueryExternalApi-PROD"
DQ_FILE_API_TIMEOUT: float = 300.0
DQ_FILE_API_HEADERS_TIMEOUT: float = DQ_FILE_API_TIMEOUT / 10.0
DQ_FILE_API_DELAY_PARAM: float = 0.04  # =1/25 ; 25 transactions per second
DQ_FILE_API_DELAY_MARGIN: float = 1.1  # 10% safety margin
DQ_FILE_API_SEGMENT_SIZE_MB: float = 8.0  # 8 MB
DQ_FILE_API_STREAM_CHUNK_SIZE: int = 8192  # 8 KB

JPMAQS_DATASET_THEME_MAPPING = {
    "Economic surprises": "JPMAQS_ECONOMIC_SURPRISES",
    "Financial conditions": "JPMAQS_FINANCIAL_CONDITIONS",
    "Generic returns": "JPMAQS_GENERIC_RETURNS",
    "Macroeconomic balance sheets": "JPMAQS_MACROECONOMIC_BALANCE_SHEETS",
    "Macroeconomic trends": "JPMAQS_MACROECONOMIC_TRENDS",
    "Shocks and risk measures": "JPMAQS_SHOCKS_RISK_MEASURES",
    "Stylized trading factors": "JPMAQS_STYLIZED_TRADING_FACTORS",
}


JPMAQS_EARLIEST_FILE_DATE = "20220101"

logger = logging.getLogger(__name__)


[docs]class DataQueryFileAPIOauth(JPMorganOAuth): """ A class to handle OAuth authentication for the JPMorgan DataQuery File API. """ def __init__( self, client_id: str, client_secret: str, resource: str = DQ_FILE_API_SCOPE, auth_url: str = OAUTH_TOKEN_URL, root_url: str = DQ_FILE_API_BASE_URL, application_name: str = "DataQueryFileAPI", proxies: Optional[Dict[str, str]] = None, verify: bool = True, **kwargs, ): super().__init__( client_id=client_id, client_secret=client_secret, resource=resource, application_name=application_name, auth_url=auth_url, root_url=root_url, proxies=proxies, verify=verify, **kwargs, )
def _resolve_base_url( primary: str, fallback: str, timeout: float = 10.0, verify: bool = True, proxies: Optional[Dict[str, str]] = None, ) -> str: """ Probe which DataQuery File API base URL is reachable. Tries *primary* first; on connection failure, falls back to *fallback*. Each ``DataQueryFileAPIClient`` instance calls this during construction, so the probe runs once per instance. """ for url, is_fallback in [(primary, False), (fallback, True)]: try: requests.head(url, timeout=timeout, verify=verify, proxies=proxies) except requests.exceptions.RequestException: if not is_fallback: logger.debug( "Primary DataQuery File API URL not reachable (%s), " "trying fallback...", primary, ) continue if is_fallback: warnings.warn( f"The primary DataQuery File API URL is not reachable: " f"{primary}\n" f"Falling back to: {url}\n" f"Please whitelist/allow the primary URL in your " f"network/firewall configuration.", UserWarning, stacklevel=2, ) logger.warning( "DataQuery File API URL fallback active: using %s instead of %s.", url, primary, ) return url # Both unreachable - return primary and let normal error handling surface it return primary
[docs]class DataQueryFileAPIClient: """ A client for accessing JPMaQS product files via the JPMorgan DataQuery File API. This client provides an alternative distribution channel to the Fusion API for JPMaQS data. It is designed to list and download JPMaQS data files, which are available as full snapshots, daily deltas, and metadata files. The client handles authentication, API requests, and file downloads, including large file downloads using a segmented, concurrent approach. Parameters ---------- client_id : Optional[str] Client ID for authentication. If not provided, it will be sourced from environment variables (`DQ_CLIENT_ID` or `DATAQUERY_CLIENT_ID`). client_secret : Optional[str] Client Secret for authentication. If not provided, it will be sourced from environment variables (`DQ_CLIENT_SECRET` or `DATAQUERY_CLIENT_SECRET`). out_dir : Optional[str] Default output directory for downloads. Can be overridden in download methods. base_url : str The base URL for the DataQuery File API. Defaults to `DQ_FILE_API_BASE_URL`. scope : str The API scope for authentication. Defaults to `DQ_FILE_API_SCOPE`. proxies : Optional[Dict[str, str]] Optional proxies to use for HTTP requests. Defaults to None. verify_ssl : bool If True, verifies SSL certificates for all requests. Defaults to True. """ def __init__( self, client_id: Optional[str] = None, client_secret: Optional[str] = None, out_dir: Optional[str] = None, base_url: str = DQ_FILE_API_BASE_URL, scope: str = DQ_FILE_API_SCOPE, proxies: Optional[Dict[str, str]] = None, verify_ssl: bool = True, ): if not (bool(client_id) and bool(client_secret)): client_id, client_secret = get_client_id_secret() if not (bool(client_id) and bool(client_secret)): raise ValueError( "Client ID and Client Secret must be provided either as arguments or " "via environment variables DQ_CLIENT_ID & DQ_CLIENT_SECRET or " "DATAQUERY_CLIENT_ID & DATAQUERY_CLIENT_SECRET" ) self.client_id = client_id self.client_secret = client_secret self.out_dir = out_dir or "./jpmaqs-download" self.base_url = _resolve_base_url( primary=base_url, fallback=DQ_FILE_API_FALLBACK_BASE_URL, verify=verify_ssl, proxies=proxies, ).rstrip("/") self.scope = scope self.proxies = proxies self.verify_ssl = verify_ssl self.catalog_file_group_id = "JPMAQS_METADATA_CATALOG" self.oauth = DataQueryFileAPIOauth( client_id=self.client_id, client_secret=self.client_secret, resource=self.scope, verify=self.verify_ssl, ) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: logger.error(tb.format_exc()) return False def _get_save_dir(self, out_dir: Optional[str] = None) -> str: base_dir = Path(out_dir or self.out_dir) if base_dir.name != "jpmaqs-download": return str(base_dir / "jpmaqs-download") return str(base_dir) def _get( self, endpoint: str, params: Optional[Dict[str, Any]] = None, retries: int = 3 ) -> Dict[str, Any]: """ Executes a GET request to a specified endpoint with retry logic. Parameters ---------- endpoint : str The API endpoint to call. params : Optional[Dict[str, Any]] A dictionary of query parameters for the request. retries : int The number of times to retry the request in case of failure. Returns ------- Dict[str, Any] The JSON response from the API as a dictionary. """ url = f"{self.base_url}{endpoint}" headers = self.oauth.get_headers() for _ in range(retries): try: return request_wrapper( method="GET", url=url, headers=headers, params=params or {}, proxies=self.proxies, as_json=True, api_delay=DQ_FILE_API_DELAY_PARAM, verify_ssl=self.verify_ssl, ) except Exception as e: logger.error(f"Error occurred during GET request: {e}") if _ == retries - 1: raise logger.info(f"Retrying... ({_ + 1}/{retries})") time.sleep(2**_)
[docs] def list_groups(self) -> pd.DataFrame: """ Lists all available data provider groups. Returns ------- pd.DataFrame A DataFrame containing details of available groups. """ endpoint = "/groups" payload = self._get(endpoint, {}) return pd.json_normalize(payload, record_path=["groups"])
[docs] def search_groups(self, keywords: str) -> pd.DataFrame: """ Searches for data provider groups that match the given keywords. Parameters ---------- keywords : str Keywords to search for in group names and descriptions. Returns ------- pd.DataFrame A DataFrame of groups matching the search criteria. """ endpoint = "/groups/search" payload = self._get(endpoint, {"keywords": keywords}) return pd.json_normalize(payload, record_path=["groups"])
[docs] @functools.lru_cache(maxsize=None) def list_group_files( self, group_id: str = JPMAQS_GROUP_ID, include_full_snapshots: bool = True, include_delta: bool = True, include_metadata: bool = True, ) -> pd.DataFrame: """ Lists all file groups (datasets) for a specific data provider. Parameters ---------- group_id : str The identifier for the data provider group, defaults to the JPMaQS group. include_full_snapshots : bool If True, include full snapshot file groups in the result. include_delta : bool If True, include delta file groups in the result. include_metadata : bool If True, include metadata file groups in the result. Returns ------- pd.DataFrame A DataFrame listing the available file groups. """ if not any([include_full_snapshots, include_delta, include_metadata]): raise ValueError( "At least one of `include_full_snapshots`, `include_delta`, or " "`include_metadata` must be True" ) endpoint = "/group/files" payload = self._get(endpoint, {"group-id": group_id}) df = pd.json_normalize(payload, record_path=["file-group-ids"]) isdeltafile = df["file-group-id"].str.endswith("_DELTA") ismetadata = df["file-group-id"].str.contains("_METADATA") isfullsnapshot = ~(isdeltafile | ismetadata) mask = pd.Series(False, index=df.index) if include_full_snapshots: mask |= isfullsnapshot if include_delta: mask |= isdeltafile if include_metadata: mask |= ismetadata df = df[mask] df = df.sort_values(by=["item"]).reset_index(drop=True) return df
[docs] @cache_decorator(ttl=60) def list_available_files( self, file_group_id: Optional[str] = None, group_id: str = JPMAQS_GROUP_ID, start_date: str = JPMAQS_EARLIEST_FILE_DATE, end_date: str = None, convert_metadata_timestamps: bool = True, include_unavailable: bool = False, ) -> pd.DataFrame: """ Lists all available files for a specific file group within a date range. Parameters ---------- file_group_id : Optional[str] The identifier for the file group (e.g. "JPMAQS_MACROECONOMIC_BALANCE_SHEETS"). If None, returns all files for the group_id. Defaults to None. group_id : str The identifier for the data provider group. start_date : str The start date for the search in "YYYYMMDD" format. end_date : str The end date for the search in "YYYYMMDD" format. Defaults to today. convert_metadata_timestamps : bool If True, convert timestamp columns to datetime objects. include_unavailable : bool If True, includes files that are listed but not currently available. Returns ------- pd.DataFrame A DataFrame of available files with their details. """ if end_date is None: end_date = pd.Timestamp.utcnow().strftime("%Y%m%d") endpoint = "/group/files/available-files" params = { "group-id": group_id, "start-date": start_date, "end-date": end_date, } if file_group_id is not None: params["file-group-id"] = file_group_id _wait_for_api_call(1) payload = self._get(endpoint, params) df = pd.json_normalize(payload, record_path=["available-files"]) if "file-datetime" not in df.columns: raise InvalidResponseError( f'Missing "file-datetime" in response from {endpoint} with params {params}' ) if not include_unavailable: df = df[df["is-available"]].copy() df.loc[:, "file-datetime"] = df["file-datetime"].astype(str) # Sort by real timestamp while leaving the column as string df["_ts"] = pd_to_datetime_compat(df["file-datetime"], utc=True) df = ( df.sort_values("_ts", ascending=False) .drop(columns="_ts") .reset_index(drop=True) ) if convert_metadata_timestamps: for col in ["file-datetime", "last-modified"]: if col not in df.columns: raise InvalidResponseError(f'Missing "{col}" in response') df[col] = pd_to_datetime_compat(df[col], utc=True) return df
[docs] @cache_decorator(ttl=60) def list_available_files_for_all_file_groups( self, group_id: str = JPMAQS_GROUP_ID, start_date: str = JPMAQS_EARLIEST_FILE_DATE, end_date: str = None, include_full_snapshots: bool = True, include_delta: bool = True, include_metadata: bool = True, convert_metadata_timestamps: bool = True, include_unavailable: bool = False, ) -> pd.DataFrame: """ Fetches and consolidates available files for all relevant file groups. This method concurrently queries for available files across all specified file group types (full snapshots, deltas, metadata) for a given provider. Parameters ---------- group_id : str The identifier for the data provider group. start_date : str The start date for the search in "YYYYMMDD" format. end_date : str The end date for the search in "YYYYMMDD" format. Defaults to today. include_full_snapshots : bool If True, query for full snapshot file groups. include_delta : bool If True, query for delta file groups. include_metadata : bool If True, query for metadata file groups. convert_metadata_timestamps : bool If True, convert timestamp columns to datetime objects. include_unavailable : bool If True, include files that are listed but not currently available. Returns ------- pd.DataFrame A consolidated DataFrame of all available files. """ files_df = self.list_available_files( file_group_id=None, group_id=group_id, start_date=start_date, end_date=end_date, convert_metadata_timestamps=convert_metadata_timestamps, include_unavailable=include_unavailable, ) if files_df.empty: return files_df if not any([include_full_snapshots, include_delta, include_metadata]): raise ValueError( "At least one of `include_full_snapshots`, `include_delta`, or " "`include_metadata` must be True" ) if "file-name" not in files_df.columns: raise InvalidResponseError('Missing "file-name" in response') delta_mask = ( files_df["file-name"] .astype(str) .str.contains("_DELTA_", case=False, na=False) ) metadata_mask = ( files_df["file-name"] .astype(str) .str.contains("_METADATA_", case=False, na=False) ) full_snapshot_mask = ~(delta_mask | metadata_mask) mask = pd.Series(False, index=files_df.index) if include_full_snapshots: mask |= full_snapshot_mask if include_delta: mask |= delta_mask if include_metadata: mask |= metadata_mask return files_df.loc[mask].copy()
[docs] def filter_available_files_by_datetime( self, since_datetime: Optional[str] = None, to_datetime: Optional[str] = None, include_full_snapshots: bool = True, include_delta: bool = True, include_metadata: bool = True, include_unavailable: bool = False, ) -> pd.DataFrame: """ Retrieves files whose 'last-modified' timestamp falls within a datetime window. Parameters ---------- since_datetime : Optional[str] The start of the time window (inclusive). Format "YYYYMMDD" or "YYYYMMDDTHHMMSS". Defaults to the start of the current day (UTC). to_datetime : Optional[str] The end of the time window (inclusive). Format "YYYYMMDD" or "YYYYMMDDTHHMMSS". Defaults to the current timestamp (UTC). include_full_snapshots : bool If True, include full snapshot files in the search. include_delta : bool If True, include delta files in the search. include_metadata : bool If True, include metadata files in the search. include_unavailable : bool If True, include files that are not currently available for download. Returns ------- pd.DataFrame A DataFrame of files modified within the specified time window. """ if since_datetime is None: since_datetime = pd.Timestamp.utcnow().strftime("%Y%m%d") if to_datetime is None: to_datetime = pd.Timestamp.utcnow().strftime("%Y%m%dT%H%M%S") validate_dq_timestamp(since_datetime, var_name="since_datetime") validate_dq_timestamp(to_datetime, var_name="to_datetime") since_ts = pd_to_datetime_compat(since_datetime, utc=True) to_ts = pd_to_datetime_compat(to_datetime, utc=True) if "T" not in str(since_datetime): since_ts = since_ts.normalize() if "T" not in str(to_datetime): to_ts = ( to_ts.normalize() + pd.DateOffset(days=1) - pd.Timedelta(nanoseconds=1) ) filter_date = since_ts.normalize() if since_ts > to_ts: logger.warning( f"`since_datetime` ({since_ts}) is after `to_datetime` ({to_ts}). Swapping values." ) since_ts, to_ts = to_ts, since_ts # Using DQ's internal filtering does not work as expected for JPMaQS end users, # hence filtering is done locally instead of passing API parameters. files_df = self.list_available_files_for_all_file_groups( include_full_snapshots=include_full_snapshots, include_delta=include_delta, include_metadata=include_metadata, include_unavailable=include_unavailable, ) files_df = files_df[files_df["file-datetime"] >= filter_date] files_df = files_df[files_df["last-modified"].between(since_ts, to_ts)] files_df = files_df.sort_values( by=["file-datetime", "last-modified"], ascending=[False, False], ).reset_index(drop=True) return files_df
[docs] def check_file_availability( self, file_group_id: str = None, file_datetime: str = None, filename: Optional[str] = None, ) -> pd.DataFrame: """ Checks if a specific file is available for download. Provide either (`file_group_id` and `file_datetime`) or `filename`. Parameters ---------- file_group_id : str The identifier for the file group. file_datetime : str The file's timestamp identifier. filename : Optional[str] The full name of the file (e.g., "JPMAQS_GENERIC_RETURNS_20250501.parquet"). Returns ------- pd.DataFrame A DataFrame with the file's availability status. """ if not ((bool(file_group_id) and bool(file_datetime)) ^ bool(filename)): raise ValueError( "One of `file_group_id` & `file_datetime`, or `filename` must be provided." ) endpoint = "/group/file/availability" params = {"file-group-id": file_group_id, "file-datetime": file_datetime} payload = self._get(endpoint, params) return pd.json_normalize(payload)
[docs] def download_file( self, file_group_id: str = None, file_datetime: str = None, filename: Optional[str] = None, out_dir: Optional[str] = None, overwrite: bool = False, qdf: bool = False, as_csv: bool = False, keep_raw_data: bool = False, chunk_size: Optional[int] = None, timeout: Optional[float] = DQ_FILE_API_TIMEOUT, max_retries: int = 3, ) -> str: """ Downloads a single Parquet file to a specified directory. This method can be called with either (`file_group_id` and `file_datetime`) or a `filename`. For large files, it automatically uses the `SegmentedFileDownloader` for a robust, multi-part download. Parameters ---------- file_group_id : str The identifier of the file group to download from. file_datetime : str The timestamp of the file to download. filename : Optional[str] The full filename to download. Overrides `file_group_id` and `file_datetime`. out_dir : str The directory where the file will be saved. overwrite : bool If True, overwrites the file if it already exists. Default is False. qdf : bool If True, converts the DataFrame to a QuantamentalDataFrame. If False, files are saved as-is in the ticker-based Parquet format. Default is False. as_csv : bool If True, saves the downloaded datasets as CSV files. Default is False, with Parquet as the default format. keep_raw_data : bool If True, keeps the raw data files after conversion. Default is False. chunk_size : Optional[int] The chunk size for streaming downloads (in bytes). timeout : Optional[float] The timeout for the download request in seconds. max_retries : int The number of retries for the entire file download. Returns ------- str The full path to the downloaded file. """ out_dir = self._get_save_dir(out_dir) if not ((bool(file_group_id) and bool(file_datetime)) ^ bool(filename)): raise ValueError( "One of `file_group_id` & `file_datetime`, or `filename` must be provided." ) if not file_group_id: try: file_group_id, file_datetime_with_ext = filename.rsplit("_", 1) file_datetime = file_datetime_with_ext.split(".")[0] except ValueError: raise ValueError(f"Invalid filename format: {filename}") endpoint = "/group/file/download" url = f"{self.base_url}{endpoint}" headers = self.oauth.get_headers() params = {"file-group-id": file_group_id, "file-datetime": file_datetime} file_name = filename or f"{file_group_id}_{file_datetime}.parquet" file_date = pd_to_datetime_compat(file_datetime).strftime("%Y-%m-%d") file_path = Path(out_dir) / Path(file_date) / Path(file_name) file_path.parent.mkdir(parents=True, exist_ok=True) if file_path.exists(): if not overwrite: logger.warning(f"File {file_path} already exists. Skipping download.") return str(file_path) logger.warning(f"File {file_path} already exists. It will be overwritten.") file_path.unlink() logger.info(f"Starting download of {file_name}...") start = time.time() download_args = dict( filename=str(file_path), url=url, headers=headers, params=params, proxies=self.proxies, chunk_size=chunk_size, timeout=timeout, api_delay=DQ_FILE_API_DELAY_PARAM, verify_ssl=self.verify_ssl, ) is_small_file = any(x in file_group_id.lower() for x in ["delta", "metadata"]) if "_DELTA" in file_group_id: is_small_file = file_datetime not in large_delta_file_datetimes() is_catalog_file = file_group_id == self.catalog_file_group_id if is_small_file: request_wrapper_stream_bytes_to_disk(**download_args) else: SegmentedFileDownloader( **download_args, max_file_retries=max_retries, start_download=True, ) time_taken = time.time() - start logger.info( f"Downloaded {file_name} in {time_taken:.2f} seconds to {file_path}" ) if not (qdf or as_csv) or is_catalog_file or not file_path.suffix == ".parquet": return str(file_path) convert_args = dict( filename=str(file_path), as_csv=as_csv, qdf=qdf, keep_raw_data=keep_raw_data, ) if PYTHON_3_8_OR_LATER: convert_ticker_based_parquet_file_to_qdf_pl(**convert_args) else: convert_ticker_based_parquet_file_to_qdf(**convert_args) if qdf: msg_str = ( f"Successfully converted {filename} to Quantamental Data Format (QDF)" ) if as_csv: msg_str += " and saved as CSV" logger.info(msg_str) return str(file_path)
[docs] def delete_corrupt_files( self, out_dir: Optional[str] = None, files: Optional[List[str]] = None, ) -> List[str]: """ Deletes corrupt files from the provided list based on file integrity checks. Parameters ---------- out_dir : Optional[str] The directory to scan for corrupt files. If None, uses the client's default output directory. files : Optional[List[str]] A list of file paths to check for corruption. If None, scans all downloaded files in the specified output directory. Returns ------- List[str] A list of file paths that were identified as corrupt and deleted. """ out_dir = self._get_save_dir(out_dir) avail_files = self.list_downloaded_files(out_dir=out_dir) if avail_files.empty: return [] if files is not None: if not all(isinstance(f, str) for f in files): raise ValueError( "All items in `files` must be strings representing file paths." ) avail_files = avail_files[avail_files["file-name"].isin(files)] files = sorted(set(map(str, avail_files["path"]))) extensions = sorted(set(Path(f).suffix.rsplit(".", 1)[-1] for f in files)) return _delete_corrupt_files(files=files, extensions=extensions)
[docs] def download_multiple_files( self, filenames: List[str], out_dir: Optional[str] = None, overwrite: bool = False, qdf: bool = False, as_csv: bool = False, keep_raw_data: bool = False, max_retries: int = 3, n_jobs: int = None, chunk_size: Optional[int] = None, timeout: Optional[float] = DQ_FILE_API_TIMEOUT, show_progress: bool = True, ) -> None: """ Downloads a list of files concurrently with progress indication. Parameters ---------- filenames : List[str] A list of full filenames to be downloaded. out_dir : str The directory to save the downloaded files. overwrite : bool If True, overwrites files if they already exist. Default is False. qdf : bool If True, converts the DataFrame to a QuantamentalDataFrame. If False, files are saved as-is in the ticker-based Parquet format. Default is False. as_csv : bool If True, saves the DataFrame as a CSV file. Default is False. keep_raw_data : bool If True, keeps the raw data files after conversion. Default is False. max_retries : int The number of times to retry downloading the entire list of failed files. n_jobs : int The number of concurrent download jobs. If -1, it uses all available cores. chunk_size : Optional[int] The chunk size for streaming downloads (in bytes). timeout : Optional[float] The timeout for each download request in seconds. show_progress : bool If True, displays a progress bar for the downloads. """ out_dir = self._get_save_dir(out_dir) Path(out_dir).mkdir(parents=True, exist_ok=True) start_time = time.time() logger.info(f"Starting download of {len(filenames)} files.") failed_files = [] if n_jobs == -1: n_jobs = None with cf.ThreadPoolExecutor(max_workers=n_jobs) as executor: futures = {} for filename in tqdm( filenames, desc="Requesting files", disable=not show_progress, ): futures[ executor.submit( self.download_file, filename=filename, out_dir=out_dir, overwrite=overwrite, qdf=qdf, as_csv=as_csv, keep_raw_data=keep_raw_data, chunk_size=chunk_size, timeout=timeout, ) ] = filename time.sleep(DQ_FILE_API_DELAY_PARAM) for future in tqdm( cf.as_completed(futures), total=len(futures), desc="Downloading files", disable=not show_progress, ): fname = futures[future] try: future.result() except KeyboardInterrupt: executor.shutdown(wait=False, cancel_futures=True) raise except Exception as e: logger.error(f"Failed to download {fname}: {e}") failed_files.append(fname) found_corrupt_files = self.delete_corrupt_files( out_dir=out_dir, files=filenames ) failed_files = sorted(set(failed_files + found_corrupt_files)) if not failed_files: total_time = time.time() - start_time logger.info( f"Successfully downloaded {len(filenames)} files in {total_time:.2f} seconds." ) return # All downloads scuccessful log_msg = f"Failed to download {len(failed_files)} files" if max_retries > 0: log_msg += f"; retrying {max_retries} more times" else: log_msg += "; no retries left" logger.warning(log_msg) if max_retries == 0: logger.error(f"Files failed after retries: {failed_files}") raise DownloadError(f"Files failed after retries: {failed_files}") return self.download_multiple_files( filenames=failed_files, out_dir=out_dir, max_retries=max_retries - 1, n_jobs=n_jobs, chunk_size=chunk_size, timeout=timeout, show_progress=show_progress, )
[docs] def download_catalog_file( self, out_dir: Optional[str] = None, add_dataset_column: bool = False, as_csv: bool = False, overwrite: bool = False, keep_raw_data: bool = False, timeout: Optional[float] = DQ_FILE_API_TIMEOUT, ) -> str: out_dir = self._get_save_dir(out_dir) available_catalogs = self.list_available_files(self.catalog_file_group_id) if available_catalogs.empty: raise DownloadError("No catalog files available for download.") latest_catalog = available_catalogs.sort_values( by=["file-datetime", "last-modified"], ascending=False ).iloc[0] latest_filename = latest_catalog["file-name"] logger.info(f"Latest catalog file identified: {latest_filename}") # check if file already exists file_path = None existing_files = self.list_downloaded_files(out_dir=out_dir) if not overwrite and not existing_files.empty: if latest_filename in sorted(existing_files["file-name"]): file_path = existing_files[ existing_files["file-name"] == latest_filename ]["path"].values[0] if file_path is None: file_path = self.download_file( filename=latest_filename, out_dir=out_dir, overwrite=overwrite, timeout=timeout, ) if not (add_dataset_column or as_csv): return file_path df = pd.read_parquet(file_path) if add_dataset_column: df.loc[:, "Dataset"] = ( df["Theme"].map(JPMAQS_DATASET_THEME_MAPPING).fillna("Unknown") ) if as_csv: csv_file_path = Path(file_path).with_suffix(".csv") df.to_csv(csv_file_path, index=False) if not keep_raw_data: Path(file_path).unlink() file_path = str(csv_file_path) else: df.to_parquet(file_path, index=False) return file_path
[docs] def get_datasets_for_indicators( self, tickers: Optional[List[str]] = None, cids: Optional[List[str]] = None, xcats: Optional[List[str]] = None, case_sensitive: bool = False, out_dir: Optional[str] = None, ) -> List[str]: for param, name in zip( [tickers, cids, xcats], ["tickers", "cids", "xcats"], ): if param is not None: if not isinstance(param, list) or not all( isinstance(x, str) for x in param ): raise ValueError(f"`{name}` must be a list of strings.") if not any(bool(x) for x in [tickers, cids, xcats]): raise ValueError( "At least one of `tickers`, `cids`, or `xcats` must be set." ) if tickers is None: tickers = [] if bool(cids) ^ bool(xcats): raise ValueError("Either both `cids` and `xcats` must be set, or neither.") if cids is None: cids, xcats = [], [] tickers = sorted(set(tickers + [f"{c}_{x}" for c in cids for x in xcats])) if not tickers or not any(t.strip() for t in tickers): raise ValueError("No valid tickers to search for.") catalog_file = self.download_catalog_file( out_dir=out_dir, add_dataset_column=True, as_csv=False, ) catalog_df = pd.read_parquet(catalog_file) if case_sensitive: catalog_df = catalog_df[catalog_df["Ticker"].isin(tickers)] else: catalog_df = catalog_df[ catalog_df["Ticker"].str.lower().isin(t.lower() for t in tickers) ] datasets_to_keep = sorted(set(catalog_df["Dataset"])) return datasets_to_keep
[docs] def list_downloaded_files( self, out_dir: Optional[str] = None, ) -> pd.DataFrame: out_dir = self._get_save_dir() col_order = [ "filename", "file-datetime", "dataset", "filetype", "file-timestamp", "path", ] dfs = [ _downloaded_files_df(out_dir, file_format=fmt, include_metadata_files=True) for fmt in ["parquet", "csv", "json"] ] dfs = [_ for _ in dfs if _ is not _.empty] if not dfs: return pd.DataFrame(columns=col_order) files_df = pd.concat(dfs).reset_index(drop=True) if files_df.empty: return files_df files_df = files_df[col_order].rename(columns={"filename": "file-name"}) return files_df
def _load_metadata_jsons( self, date: Optional[Union[pd.Timestamp, str]] = None, normalize_headers: bool = True, skip_download: bool = False, ) -> Dict[str, pd.DataFrame]: """Load JPMaQS metadata notification JSONs for a date.""" date: pd.Timestamp = ( pd_to_datetime_compat(date) if date is not None else pd.Timestamp.utcnow() ).normalize() if date > pd.Timestamp.utcnow().normalize(): today_utc = pd.Timestamp.utcnow().normalize() raise ValueError( "Provided `date` is in the future (UTC). " f"Requested: {date.date()}, today (UTC): {today_utc.date()}." ) if not skip_download: to_dt = date + pd.offsets.BDay(1) - pd.Timedelta(seconds=1) self.download_full_snapshot( since_datetime=date, to_datetime=to_dt, include_full_snapshots=False, include_delta=False, include_metadata=True, ) df = self.list_downloaded_files() df: pd.DataFrame = df[ (df["dataset"] == "JPMAQS_METADATA_NOTIFICATIONS") & df["file-name"].str.lower().str.endswith(".json") ] date = date.normalize() df = df[df["file-timestamp"].dt.normalize() == date] if df.empty: logger.warning(f"No notification files found for date: {date.date()}") return {} json_contentts: Dict[str, pd.DataFrame] = {} err_str = 'Invalid notification file (missing "sub_title"): ' title_err_str = "Unexpected notification title in file: " expected_titles = [ "Missing Updates", "Changed historical values", "Additional information on missing updates", ] canonical_title_map = {t.upper(): t for t in expected_titles} for jp in df["path"].apply(str).tolist(): _json = {} with open(jp, "r", encoding="utf-8") as f: _json: Dict[str, dict] = json.load(f) if _json.get("metadata", {}).get("sub_title", None) is None: logger.warning(err_str + jp) continue j_title: str = _json["metadata"]["sub_title"] if j_title.upper() not in map(str.upper, expected_titles): logger.warning(title_err_str + jp) continue canonical_title = canonical_title_map[j_title.upper()] json_contentts[canonical_title] = pd.json_normalize( _json, record_path=["data"] ) if normalize_headers: for key in json_contentts: new_cols = [ _col.replace(" ", "_") .replace("-", "_") .replace("(%)", "pct") .lower() for _col in json_contentts[key].columns ] json_contentts[key].columns = new_cols return json_contentts
[docs] def get_revisions_notifications( self, date: Optional[Union[pd.Timestamp, str]] = None, normalize_headers: bool = True, ) -> pd.DataFrame: """ Return "Changed historical values" notifications for a given date. This loads daily JPMaQS metadata notification JSON(s) for the requested date and returns the table describing historical revisions. If no matching notification file(s) are found, an empty DataFrame is returned. Parameters ---------- date : Optional[Union[pd.Timestamp, str]] Target date (UTC). Strings can be "YYYY-MM-DD", "YYYYMMDD", or ISO 8601. Defaults to today (UTC). normalize_headers : bool If True, normalizes column names to lowercase snake_case and converts "(%)" to "pct". Defaults to True. Returns ------- pd.DataFrame A DataFrame of revision notifications. Empty if none are found. """ jsons = self._load_metadata_jsons( date=date, normalize_headers=normalize_headers ) if "Changed historical values" not in jsons: logger.warning("No `Changed historical values` notifications found.") return pd.DataFrame() return jsons["Changed historical values"]
[docs] def get_missing_data_notifications( self, date: Optional[Union[pd.Timestamp, str]] = None, normalize_headers: bool = True, ) -> pd.DataFrame: """ Return missing-update notifications (with optional additional information). This loads daily JPMaQS metadata notification JSON(s) for the requested date. It returns: - "Missing Updates" rows - left-joined with "Additional information on missing updates" when available If only one of the two tables is available, that table is returned. If neither is available, an empty DataFrame is returned. Parameters ---------- date : Optional[Union[pd.Timestamp, str]] Target date (UTC). Strings can be "YYYY-MM-DD", "YYYYMMDD", or ISO 8601. Defaults to today (UTC). normalize_headers : bool If True, normalizes column names to lowercase snake_case and converts "(%)" to "pct". Defaults to True. Returns ------- pd.DataFrame A DataFrame of missing-update notifications (optionally enriched). """ jsons = self._load_metadata_jsons( date=date, normalize_headers=normalize_headers ) df1 = jsons.get("Missing Updates", pd.DataFrame()) df2 = jsons.get("Additional information on missing updates", pd.DataFrame()) if df1.empty and df2.empty: logger.warning("No `Missing Updates` or related notifications found.") return pd.DataFrame() if df2.empty: logger.warning( "No `Additional information on missing updates` notifications found." ) return df1 if df1.empty: logger.warning("No `Missing Updates` notifications found.") return df2 left_join_key = None if "Ticker" in df1.columns and "ticker" in df2.columns: df1 = df1.rename(columns={"Ticker": "ticker"}) elif "ticker" in df1.columns and "Ticker" in df2.columns: df2 = df2.rename(columns={"Ticker": "ticker"}) for candidate in ("Ticker", "ticker"): if candidate in df1.columns and candidate in df2.columns: left_join_key = candidate break if left_join_key is None: raise KeyError( 'Expected a common join key ("Ticker" or "ticker") in notification data.' ) df1 = ( df1.merge(df2, how="left", on=left_join_key) .sort_values(by=left_join_key, ascending=True) .reset_index(drop=True) ) return df1
[docs] def download_full_snapshot( self, out_dir: Optional[str] = None, since_datetime: Optional[str] = None, to_datetime: Optional[str] = None, file_datetime: Optional[str] = None, overwrite: bool = False, qdf: bool = False, as_csv: bool = False, keep_raw_data: bool = False, chunk_size: Optional[int] = None, timeout: Optional[float] = DQ_FILE_API_TIMEOUT, include_full_snapshots: bool = True, include_delta: bool = True, include_metadata: bool = True, file_group_ids: Optional[List[str]] = None, show_progress: bool = True, ) -> None: """ Downloads a complete snapshot of files based on specified criteria. This method fetches a list of files modified within a given time window and then downloads them. It can be customized to download only specific file types or from a specific list of file groups. Parameters ---------- out_dir : str The directory where files will be saved. since_datetime : Optional[str] Download files modified since this timestamp (inclusive). Defaults to the start of the current day (UTC) if `file_datetime` is not set. to_datetime : Optional[str] Download files modified up to this timestamp (inclusive). file_datetime : Optional[str] A specific file date to check for. Overrides `since_datetime`. overwrite : bool If True, overwrites files if they already exist. Default is False. qdf : bool If True, converts the DataFrame to a QuantamentalDataFrame. If False, files are saved as-is in the ticker-based Parquet format. Default is False. as_csv : bool If True, saves the downloaded datasets as CSV files. Default is False, with Parquet as the default format. keep_raw_data : bool If True, keeps the raw data files after conversion. Default is False. chunk_size : Optional[int] The chunk size for streaming downloads (in bytes). timeout : Optional[float] The timeout for each download request in seconds. include_full_snapshots : bool If True, download full snapshot files. include_delta : bool If True, download delta files. include_metadata : bool If True, download metadata files. file_group_ids : Optional[List[str]] A specific list of file groups to download from. If provided, only files from these groups will be downloaded. show_progress : bool If True, displays a progress bar for downloads. """ out_dir = self._get_save_dir(out_dir) Path(out_dir).mkdir(parents=True, exist_ok=True) start_time = time.time() if file_datetime is None and since_datetime is None: since_datetime = pd.Timestamp.utcnow().strftime("%Y%m%d") effective_ts = file_datetime or since_datetime logger.info( f"Starting snapshot download to '{out_dir}' for files since {effective_ts}." ) validate_dq_timestamp( effective_ts, var_name="file_datetime" if file_datetime else "since_datetime", ) files_df = self.filter_available_files_by_datetime( since_datetime=since_datetime, to_datetime=to_datetime, include_full_snapshots=include_full_snapshots, include_delta=include_delta, include_metadata=include_metadata, ) if file_group_ids is not None: if not isinstance(file_group_ids, list) or not all( isinstance(x, str) for x in file_group_ids ): raise ValueError("`file_group_ids` must be a list of strings.") files_df = files_df[files_df["file-group-id"].isin(file_group_ids)].copy() downloaded_files_df = self.list_downloaded_files(out_dir=out_dir) if not overwrite and not downloaded_files_df.empty: files_df = files_df[ ~(files_df["file-name"].isin(downloaded_files_df["file-name"])) ].copy() num_files_to_download = len(files_df["file-name"]) num_files_to_download = len(files_df["file-name"]) logger.info(f"Found {num_files_to_download} new files to download.") if not num_files_to_download: logger.info("No new files to download.") return files_df["download-priority"] = ( files_df["file-name"] .str.lower() .apply(lambda x: (3 if "_metadata" in x else (2 if "_delta" in x else 1))) ) download_order = files_df.sort_values( by=["download-priority", "file-datetime", "file-name"], )["file-name"].tolist() self.download_multiple_files( filenames=download_order, out_dir=out_dir, overwrite=overwrite, qdf=qdf, as_csv=as_csv, keep_raw_data=keep_raw_data, chunk_size=chunk_size, timeout=timeout, show_progress=show_progress, ) total_time = time.time() - start_time logger.info(f"Snapshot download completed in {total_time:.2f} seconds.")
[docs] def download( self, tickers: Optional[List[str]] = None, cids: Optional[List[str]] = None, xcats: Optional[List[str]] = None, metrics: Optional[List[str]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, dataframe_format: str = "qdf", dataframe_type: str = "pandas", categorical_dataframe: bool = True, include_delta_files: bool = False, show_progress: bool = True, out_dir: Optional[str] = None, overwrite: bool = False, qdf: bool = False, keep_raw_data: bool = False, as_csv: bool = False, ) -> Union[pd.DataFrame, pl.DataFrame, pl.LazyFrame]: """ A method to download data and load it as a DataFrame based on specified indicators, and specified date range. Parameters ---------- tickers : Optional[List[str]] A list of tickers to filter datasets. Each ticker must be in the standard format "CID_XCAT" used in JPMaQS. cids : Optional[List[str]] A list of cross-sectional identifiers (CIDs) to filter datasets. xcats : Optional[List[str]] A list of extended categories (XCATS) to filter datasets. metrics : Optional[List[str]] A list of JPMaQS metrics to filter the data. Available metrics are "value", "grading", "eop_lag", "mop_lag", and "last_updated". The available metrics are also defined in `macrosynergy.constants.JPMAQS_METRICS`. The default is None, in which case all metrics are returned. start_date : Optional[str] The start date for the returned data in the ISO format "YYYY-MM-DD". If None, data is returned from the earliest available date. end_date : Optional[str] The end date for the returned data in the ISO format "YYYY-MM-DD". If None, data is returned up to the latest available date. dataframe_format : str The format of the returned DataFrame. Options are "qdf" for QuantamentalDataFrame or "tickers" for a standard DataFrame with tickers as columns. Default is "qdf". dataframe_type : str The type of DataFrame to return. Options are "pandas" for a pandas DataFrame, "polars" for a polars DataFrame, or "polars-lazy" for a polars LazyFrame. Default is "pandas". categorical_dataframe : bool If True and `dataframe_type` is "pandas", the returned DataFrame will use categorical dtypes for object columns. Default is True. include_delta_files : bool If True, delta files will be included in the download. Default is False. show_progress : bool If True, displays a progress bar during downloads. Default is True. out_dir : Optional[str] The output directory for downloaded files. The default directory being used by the DataQueryFileAPI instance is used if None. overwrite : bool If True, overwrites files if they already exist. Default is False. qdf : bool If True, each downloaded dataframe will be saved as a QuantamentalDataFrame, otherwise files are saved as-is in the ticker-based Parquet format. Default is False. keep_raw_data : bool If True, keeps the raw data files after conversion. Default is False. as_csv : bool If True, saves the downloaded datasets as CSV files. Default is False, with Parquet as the default format. Returns ------- Union[pd.DataFrame, pl.DataFrame, pl.LazyFrame] A DataFrame containing the requested data. """ if include_delta_files: raise NotImplementedError( "Downloading delta files is not implemented in this method." ) out_dir = self._get_save_dir(out_dir) datasets_to_download = self.get_datasets_for_indicators( tickers=tickers, cids=cids, xcats=xcats ) self.download_full_snapshot( out_dir=out_dir, since_datetime=pd.Timestamp.utcnow().strftime("%Y%m%d"), file_group_ids=datasets_to_download, overwrite=overwrite, qdf=qdf, as_csv=as_csv, keep_raw_data=keep_raw_data, show_progress=show_progress, include_full_snapshots=True, include_delta=include_delta_files, include_metadata=False, ) return lazy_load_from_parquets( files_dir=out_dir, tickers=tickers, cids=cids, xcats=xcats, metrics=metrics, start_date=start_date, end_date=end_date, dataframe_format=dataframe_format, dataframe_type=dataframe_type, categorical_dataframe=categorical_dataframe, datasets=datasets_to_download, )
def _pd_to_datetime_compat(ts: str, utc: bool): formats = [ "%Y%m%d", "%Y%m%dT%H%M%S", "%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", # ISO with timezone information "%Y-%m-%dT%H:%M:%SZ", # UTC with Z (e.g. 2025-09-16T12:34:56Z) "%Y-%m-%dT%H:%M:%S%z", # With numeric offset (e.g. 2025-09-16T12:34:56+02:00 or +0200) ] formats_str = f"[{', '.join(formats).replace('%', '').upper()}]" for fmt in formats: try: return pd.to_datetime(ts, format=fmt, utc=utc) except (ValueError, TypeError): continue raise ValueError( f"Timestamp '{ts}' does not match expected formats. Use one of {formats_str}." )
[docs]def pd_to_datetime_compat( ts: Union[str, pd.Series], format: str = "mixed", utc: bool = True, ): if PD_2_0_OR_LATER: return pd.to_datetime(ts, format=format, utc=utc) if isinstance(ts, pd.Series): return ts.apply(lambda x: _pd_to_datetime_compat(x, utc=utc)) return _pd_to_datetime_compat(ts, utc=utc)
[docs]def validate_dq_timestamp( ts: str, var_name: str = None, raise_error: bool = True ) -> bool: """Validate a timestamp string for DataQuery API.""" try: if PD_2_0_OR_LATER: pd.to_datetime(ts, format="mixed", utc=True) else: pd_to_datetime_compat(ts, utc=True) return True except (ValueError, TypeError): if raise_error: vn = f"`{var_name}`" if var_name else "Timestamp" raise ValueError( f"Invalid {vn} format. Use YYYYMMDD, YYYYMMDDTHHMMSS, or a " "recognized timestamp format with timezone." ) else: return False
[docs]def get_client_id_secret() -> Optional[Tuple[str, str]]: """Retrieve client ID and secret from environment variables.""" pairs = [ ("DQ_CLIENT_ID", "DQ_CLIENT_SECRET"), ("DATAQUERY_CLIENT_ID", "DATAQUERY_CLIENT_SECRET"), ] for client_id_env, client_secret_env in pairs: client_id = os.getenv(client_id_env) client_secret = os.getenv(client_secret_env) if client_id and client_secret: logger.info( f"Using {client_id_env} and {client_secret_env} from environment" ) return client_id, client_secret return None, None
[docs]@functools.lru_cache(maxsize=1) def large_delta_file_datetimes(as_str: bool = True) -> List[str]: """ Plausible file datetimes for large delta files, which are typically generated at the end of each month and on business month ends, with timestamps of end-of-day (23:59:59). """ sd, ed = JPMAQS_EARLIEST_FILE_DATE, pd.Timestamp.today() dt1 = list(pd.date_range(start=sd, end=ed, freq="M")) dt2 = list(pd.date_range(start=sd, end=ed, freq="BM")) all_dates = sorted(set(dt1 + dt2)) all_dates = [ d.normalize() + pd.Timedelta(hours=23, minutes=59, seconds=59) for d in all_dates ] if not as_str: return all_dates return [d.strftime("%Y%m%dT%H%M%S") for d in all_dates]
def _delete_corrupt_files( files: List[Path], extensions: List[str] = ["parquet", "json"], allow_empty: bool = False, ) -> List[Path]: """Deletes corrupt files based on their extensions.""" removed_files = [] for file_path in map(Path, files): if not file_path.exists(): continue if file_path.suffix.lower() not in [ f".{ext.strip('.').lower()}" for ext in extensions ]: continue try: if file_path.suffix.lower() == ".parquet": head = pl.scan_parquet(file_path).head().collect() if not allow_empty and head.is_empty(): raise ValueError("File is empty") elif file_path.suffix.lower() == ".json": with open(file_path, "r", encoding="utf-8") as f: js = json.load(f) if not allow_empty and not js: raise ValueError("File is empty") else: continue except KeyboardInterrupt: raise except Exception: logger.warning(f"Deleting corrupt file: {file_path}") file_path.unlink() removed_files.append(file_path) return sorted(map(str, removed_files))
[docs]class SegmentedFileDownloader: """ A utility class to manage the multi-part, concurrent download of a single large file. """ def __init__( self, filename: str, url: str, headers: Dict[str, str], params: Dict[str, str], proxies: Optional[Dict[str, str]] = None, chunk_size: int = DQ_FILE_API_STREAM_CHUNK_SIZE, segment_size_mb: int = DQ_FILE_API_SEGMENT_SIZE_MB, timeout: int = DQ_FILE_API_TIMEOUT, api_delay: float = DQ_FILE_API_DELAY_PARAM, api_delay_margin: float = DQ_FILE_API_DELAY_MARGIN, headers_timeout: int = DQ_FILE_API_HEADERS_TIMEOUT, max_concurrent_downloads: int = None, max_file_retries: int = 3, verify_ssl: bool = True, start_download: bool = False, debug: bool = False, ): """Initializes the downloader with URL, headers, and download parameters.""" self.filename = Path(filename) self.url = url self.headers = headers self.params = params if not set(["file-group-id", "file-datetime"]).issubset(params): raise ValueError( "Missing required parameters: 'file-group-id' and 'file-datetime'" ) self.file_id = params["file-group-id"] + "_" + params["file-datetime"] self.proxies = proxies self.out_dir = Path(self.filename.parent) self.out_dir.mkdir(parents=True, exist_ok=True) self.chunk_size = chunk_size self.segment_size_mb = segment_size_mb self.timeout = timeout self.api_delay = api_delay * api_delay_margin self.headers_timeout = headers_timeout self.max_concurrent_downloads = max_concurrent_downloads self.max_file_retries = max_file_retries self.verify_ssl = verify_ssl self.debug = debug self.temp_dir = self.out_dir / f"_tmp_{self.filename.name}_{uuid.uuid4().hex}" if start_download: try: self.download() except Exception: self.cleanup() raise def __enter__(self): """Allows the downloader to be used as a context manager.""" return self def __exit__(self, exc_type, exc_value, traceback): """Ensures cleanup of temporary files upon exiting the context.""" if exc_type is not None: logger.error(tb.format_exc()) self.cleanup() return False
[docs] def log(self, msg: str, part_num: int = None, level: int = logging.INFO): """Logs a message with downloader-specific context.""" part_info = f"[part={part_num}]" if part_num is not None else "" logger.log( level, f"[SegmentedFileDownloader][file={self.file_id}]{part_info} {msg}" )
[docs] def download(self, retries: int = None) -> Path: """Orchestrates the entire file download process, including retries.""" last_exception = None if retries is None: retries = self.max_file_retries try: self.log("Starting segmented file download") start_time = time.time() if self.temp_dir.exists(): shutil.rmtree(self.temp_dir) self.temp_dir.mkdir(exist_ok=True, parents=True) total_size = self._get_file_size() self.log(f"File size: {total_size / (1024*1024):.2f} MB") chunk_size = int(self.segment_size_mb * 1024 * 1024) chunks = range(0, total_size, chunk_size) self.log(f"Creating {len(chunks)} download tasks") self._download_chunks_concurrently(chunks, total_size) final_path = Path(self.filename).resolve() self._assemble_parts(final_path, len(chunks)) duration = time.time() - start_time self.log(f"Download complete in {duration:.2f} seconds.") self.log(f"Saved to: {final_path}") return final_path except KeyboardInterrupt: raise except Exception as e: last_exception = e self.log(f"Download failed. Error: {e}", level=logging.ERROR) if self.debug: raise e if retries > 0: self.log( f"Retrying download (attempt {self.max_file_retries - retries + 1}/{self.max_file_retries})..." ) time.sleep(self.api_delay) self.cleanup() return self.download(retries=retries - 1) self.cleanup() raise last_exception
def _get_file_size(self) -> int: """Fetches the total size of the file using a HEAD request.""" self.log("Fetching file size...") _wait_for_api_call(self.api_delay) start_time = time.time() response = requests.head( self.url, params=self.params, headers=self.headers, proxies=self.proxies, verify=self.verify_ssl, ) response.raise_for_status() duration = time.time() - start_time self.log(f"Received headers in {duration:.2f} seconds.") cl_header = response.headers.get("Content-Length") try: content_length = int(cl_header) except (ValueError, TypeError): raise ValueError( f"[SegmentedFileDownloader][file={self.file_id}] Invalid or missing Content-Length header: {cl_header}." ) self.log(f"Content-Length: {content_length}") return content_length def _download_chunks_concurrently(self, chunks: range, total_size: int): """Manages the parallel download of all file chunks.""" with cf.ThreadPoolExecutor( max_workers=self.max_concurrent_downloads ) as executor: futures = [] for i, start in enumerate(chunks): # wait before next API call future = executor.submit( self._download_chunk, i, start, min(start + chunks.step - 1, total_size - 1), ) futures.append(future) try: for future in cf.as_completed(futures): if future.exception(): executor.shutdown(wait=False, cancel_futures=True) raise future.exception() except KeyboardInterrupt: executor.shutdown(wait=False, cancel_futures=True) raise def _download_chunk(self, part_num: int, start_byte: int, end_byte: int) -> None: """Starts the download process for a single file chunk.""" self._download_chunk_retry(part_num, start_byte, end_byte, retries=1) def _download_chunk_retry( self, part_num: int, start_byte: int, end_byte: int, retries: int ) -> None: """Downloads a specific byte range of the file with a retry mechanism.""" self.log(f"Downloading bytes [{start_byte}-{end_byte}]", part_num=part_num) segment_headers = self.headers.copy() segment_headers["Range"] = f"bytes={start_byte}-{end_byte}" part_path = self.temp_dir / f"part_{part_num}" try: _wait_for_api_call(self.api_delay) with requests.get( headers=segment_headers, url=self.url, params=self.params, proxies=self.proxies, stream=True, timeout=self.timeout, verify=self.verify_ssl, ) as response: response.raise_for_status() with open(part_path, "wb") as f: for chunk in response.iter_content(chunk_size=self.chunk_size): f.write(chunk) self.log("Finished download.", part_num=part_num) except KeyboardInterrupt: raise except Exception as e: if isinstance(e, requests.exceptions.HTTPError): if hasattr(e, "response") and hasattr(e.response, "status_code"): if 400 <= e.response.status_code < 500: retries = 0 raise e self.log( f"FAILED download. Error: {e}", part_num=part_num, level=logging.ERROR ) if retries > 0: self.log("Retrying download...", part_num=part_num) self._download_chunk_retry(part_num, start_byte, end_byte, retries - 1) else: raise def _assemble_parts(self, final_path: Path, num_parts: int): """Combines the downloaded chunks into a single final file.""" self.log(f"Assembling {num_parts} parts") with open(final_path, "wb") as final_file: for i in range(num_parts): part_path = self.temp_dir / f"part_{i}" with open(part_path, "rb") as part_file: shutil.copyfileobj(part_file, final_file) final_size = final_path.stat().st_size self.log(f"Assembled file size: {final_size / (1024*1024):.2f} MB") self.cleanup()
[docs] def cleanup(self): """Removes the temporary directory and all downloaded parts.""" if self.temp_dir.exists(): shutil.rmtree(self.temp_dir) self.log("Cleaned up temporary files.")
def _atomic_sink_csv(lf: pl.LazyFrame, final_out: Path, sidecar: Path) -> None: """Atomic sink for CSV files - ensures complete writes/cleans up on failure.""" try: sidecar.unlink() except FileNotFoundError: pass try: lf.sink_csv(str(sidecar)) os.replace(sidecar, final_out) except BaseException: try: sidecar.unlink() except FileNotFoundError: pass raise def _atomic_sink_parquet( lf: pl.LazyFrame, final_out: Path, sidecar: Path, *, compression: str ) -> None: """Atomic sink for Parquet files - ensures complete writes/cleans up on failure.""" try: sidecar.unlink() except FileNotFoundError: pass try: lf.sink_parquet(str(sidecar), compression=compression) os.replace(sidecar, final_out) except BaseException: try: sidecar.unlink() except FileNotFoundError: pass raise def _convert_ticker_based_parquet_file_to_qdf_pl( filename: str, compression: str = "zstd", as_csv: bool = False, qdf: bool = False, keep_raw_data: bool = False, ) -> None: src = Path(filename) if not src.is_file(): raise FileNotFoundError(f"No such file: {filename}") base = src.with_suffix("") dirpath = src.parent # passthrough to CSV from sink_csv if as_csv and not qdf: final_out = base.with_suffix(".csv") sidecar = dirpath / f".{final_out.name}.inprogress" _atomic_sink_csv(pl.scan_parquet(str(src)), final_out, sidecar) if not keep_raw_data: src.unlink(missing_ok=True) return if not qdf: return lf = pl.scan_parquet(str(src)) parts = pl.col("ticker").str.splitn("_", 2) lf = lf.with_columns( cid=parts.struct.field("field_0"), xcat=parts.struct.field("field_1"), ) wanted = ["real_date", "value", "grading", "eop_lag", "mop_lag", "last_updated"] present = [c for c in wanted if c in lf.collect_schema().names()] lf = lf.select(present + ["cid", "xcat"]) if as_csv: final_out = ( base.with_suffix(".csv") if not keep_raw_data else dirpath / f"{base.name}_qdf.csv" ) sidecar = dirpath / f".{final_out.name}.inprogress" _atomic_sink_csv(lf, final_out, sidecar) if not keep_raw_data: src.unlink(missing_ok=True) else: if keep_raw_data: final_out = dirpath / f"{base.name}_qdf.parquet" else: final_out = src sidecar = dirpath / f".{final_out.name}.inprogress" _atomic_sink_parquet(lf, final_out, sidecar, compression=compression) if not keep_raw_data and final_out is not src: src.unlink(missing_ok=True)
[docs]def convert_ticker_based_parquet_file_to_qdf_pl( filename: str, compression: str = "zstd", as_csv: bool = False, qdf: bool = True, keep_raw_data: bool = False, ) -> None: try: _convert_ticker_based_parquet_file_to_qdf_pl( filename=filename, compression=compression, as_csv=as_csv, qdf=qdf, keep_raw_data=keep_raw_data, ) except Exception as e: logger.error(f"Error converting file {filename}: {e}") try: p = Path(filename) for cand in [ f".{p.with_suffix('.csv').name}.inprogress", f".{p.name}.inprogress", f".{p.with_suffix('').name}_qdf.csv.inprogress", f".{p.with_suffix('').name}_qdf.parquet.inprogress", ]: try: (p.parent / cand).unlink() except FileNotFoundError: pass except Exception: pass if not Path(filename).is_file(): raise FileNotFoundError( f"Conversion failed and file not found: {filename}" ) from e raise
def _check_lazy_load_inputs( files_dir: Union[str, Path], file_format: str, tickers: Optional[List[str]], cids: Optional[List[str]], xcats: Optional[List[str]], metrics: Optional[List[str]], start_date: Optional[Union[str, pd.Timestamp]], end_date: Optional[Union[str, pd.Timestamp]], dataframe_format: str, dataframe_type: str, categorical_dataframe: bool, datasets: Optional[List[str]] = None, ): files_dir = Path(files_dir) if not files_dir.is_dir(): raise FileNotFoundError(f"No such directory: {files_dir}") if file_format not in ["parquet", "csv"]: raise ValueError("`file_format` must be one of 'parquet' or 'csv'.") if file_format == "csv": raise NotImplementedError("CSV file format is not yet supported.") # check whether or not there are any parquet files in the glob directory -recursive if not _list_downloaded_files(files_dir, file_format): raise FileNotFoundError( f"No {file_format} files found in directory: {files_dir}" ) for param, name in [ (tickers, "tickers"), (cids, "cids"), (xcats, "xcats"), (metrics, "metrics"), (datasets, "datasets"), ]: if param is not None and ( not isinstance(param, list) or not all(isinstance(x, str) for x in param) ): raise ValueError(f"If provided, `{name}` must be a list of strings.") if bool(cids) ^ bool(xcats): raise ValueError( "Both `cids` and `xcats` must be provided together, or neither." ) for param, name in [ (start_date, "start_date"), (end_date, "end_date"), ]: if param is not None and not isinstance(param, (str, pd.Timestamp)): raise ValueError(f"`{name}` must be a string or pandas Timestamp.") if dataframe_format not in ["qdf", "wide", "tickers"]: raise ValueError("`dataframe_format` must be one of 'qdf', 'wide', 'tickers'.") if dataframe_type not in ["pandas", "polars", "polars-lazy"]: raise ValueError( "`dataframe_type` must be one of 'pandas', 'polars', 'polars-lazy'." ) if not isinstance(categorical_dataframe, bool): raise ValueError("`categorical_dataframe` must be a boolean.") def _list_downloaded_files(files_dir: Path, file_format: str = "parquet") -> List[Path]: files_dir = Path(files_dir) assert files_dir.is_dir(), f"No such directory: {files_dir}" if file_format not in ["parquet", "csv", "json"]: raise ValueError("`file_format` must be one of 'parquet', 'csv', or 'json'.") files = sorted(files_dir.glob(f"**/*.{file_format}")) return files def _downloaded_files_df( files_dir: Path, file_format: str = "parquet", include_metadata_files: bool = False, ) -> pd.DataFrame: if not Path(files_dir).is_dir(): return pd.DataFrame(columns=["path", "filename", "filetype", "dataset"]) files_list = _list_downloaded_files(files_dir, file_format) df = pd.DataFrame({"path": files_list}) if df.empty: return df df["path"] = df["path"].apply(lambda x: Path(x).resolve()) df["filename"] = df["path"].apply(lambda x: Path(x).name) if not include_metadata_files: df = df[~df["filename"].str.contains("_METADATA")].copy() df["filetype"] = df["path"].apply(lambda x: Path(x).suffix.split(".")[-1]) df["dataset"] = df["filename"].apply( lambda x: str(x).split(".")[0].rsplit("_", 1)[0] ) df["file-datetime"] = df["filename"].apply( lambda x: str(x).split(".")[0].rsplit("_", 1)[-1] ) df["file-timestamp"] = df["file-datetime"].apply(lambda x: pd_to_datetime_compat(x)) df = df.reset_index(drop=True) return df def _filter_to_latest_files( files_df: pd.DataFrame, include_delta_files: bool = False, ) -> pd.DataFrame: if include_delta_files: raise NotImplementedError( "Filtering to latest files including delta files is not implemented." ) if files_df.empty: return files_df if not include_delta_files: files_df = files_df[~files_df["filename"].str.contains("_DELTA")].copy() # Filter to rows where file-timestamp == per-dataset max latest_mask = files_df["file-timestamp"].eq( files_df.groupby("dataset")["file-timestamp"].transform("max") ) latest_files = ( files_df.loc[latest_mask] .sort_values(["dataset", "file-timestamp", "filename"]) .reset_index(drop=True) ) return latest_files
[docs]def lazy_load_from_parquets( files_dir: Union[str, Path], file_format: str = "parquet", tickers: Optional[List[str]] = None, cids: Optional[List[str]] = None, xcats: Optional[List[str]] = None, metrics: Optional[List[str]] = None, start_date: Optional[Union[str, pd.Timestamp]] = None, end_date: Optional[Union[str, pd.Timestamp]] = None, dataframe_format: str = "qdf", dataframe_type: str = "pandas", categorical_dataframe: bool = True, datasets: Optional[List[str]] = None, include_delta_files: bool = False, include_metadata_files: bool = False, ) -> pd.DataFrame: files_dir = Path(files_dir) if (not metrics) or (metrics == "all") or ("all" in metrics): metrics = JPMAQS_METRICS _check_lazy_load_inputs( files_dir, file_format, tickers, cids, xcats, metrics, start_date, end_date, dataframe_format, dataframe_type, categorical_dataframe, ) available_files_df: pd.DataFrame = _downloaded_files_df( files_dir=files_dir, file_format=file_format, include_metadata_files=include_metadata_files, ) available_files_df: pd.DataFrame = _filter_to_latest_files( files_df=available_files_df, include_delta_files=include_delta_files, ) if datasets: available_files_df = available_files_df.loc[ available_files_df["dataset"].isin(datasets) ] tickers = tickers or [] if cids: tickers += [f"{c}_{x}" for c in cids for x in xcats] lf: pl.LazyFrame = _lazy_load_filtered_parquets( paths=sorted(available_files_df["path"]), tickers=tickers, start_date=start_date, end_date=end_date, return_qdf=(dataframe_format == "qdf"), ) if metrics and set(metrics) != set(JPMAQS_METRICS): cols_to_keep = ["real_date", "cid", "xcat", "ticker"] + metrics if PYTHON_3_8_OR_LATER: lf = lf.select( [pl.col(c) for c in cols_to_keep if c in lf.collect_schema().names()] ) else: lf = lf.select([pl.col(c) for c in cols_to_keep if c in lf.schema.keys()]) if dataframe_type == "polars-lazy": return lf cat_cols = ["cid", "xcat", "ticker"] if dataframe_type == "polars": if categorical_dataframe: cols = None if PYTHON_3_8_OR_LATER: cols = [c for c in cat_cols if c in lf.collect_schema().names()] else: cols = [c for c in cat_cols if c in lf.schema.keys()] if cols: lf = lf.with_columns([pl.col(c).cast(pl.Categorical) for c in cols]) return lf.collect() if dataframe_type == "pandas": df = lf.collect().to_pandas() if categorical_dataframe: cols = [c for c in cat_cols if c in df.columns] if cols: df[cols] = df[cols].astype("category") return df raise ValueError("Unknown dataframe type")
[docs]class JPMaQSParquetSchemaKind(Enum): TICKER = "ticker" QDF = "qdf"
def _identify_schema_type(lf: pl.LazyFrame) -> JPMaQSParquetSchemaKind: if PYTHON_3_8_OR_LATER: cols = set(lf.collect_schema().keys()) else: cols = set(lf.schema.keys()) if "ticker" in cols: return JPMaQSParquetSchemaKind.TICKER if {"cid", "xcat"}.issubset(cols): return JPMaQSParquetSchemaKind.QDF raise ValueError( "Unknown schema: need either 'ticker' or both 'cid' and 'xcat'. " f"Found columns: {sorted(cols)}" ) def _expr_split_ticker(ticker_expr: pl.Expr) -> Tuple[pl.Expr, pl.Expr]: """ Robust split of 'CID_XCAT...' into (cid, xcat) WITHOUT using splitn(). Works across Polars versions (avoids struct vs list return type issues). """ splitx = ticker_expr.str.splitn("_", 2) cid = splitx.struct.field("field_0") xcat = splitx.struct.field("field_1") return cid, xcat def _ensure_columns(lf: pl.LazyFrame, cols: Sequence[str]) -> pl.LazyFrame: """ Ensure all `cols` exist before .select(...). This runs schema-only (lf.collect_schema()), not a materialization. """ if PYTHON_3_8_OR_LATER: have = set(lf.collect_schema().keys()) else: have = set(lf.schema.keys()) missing = [c for c in cols if c not in have] return lf.with_columns(**{c: pl.lit(None) for c in missing}) if missing else lf def _filter_lazy_frame_by_tickers( lf: pl.LazyFrame, kind: JPMaQSParquetSchemaKind, tickers: Sequence[str], start_date: Optional[Union[str, pd.Timestamp]], end_date: Optional[Union[str, pd.Timestamp]], ) -> pl.LazyFrame: tickers_list = [t for t in tickers if t] if kind is JPMaQSParquetSchemaKind.TICKER: return lf.filter(pl.col("ticker").is_in(tickers_list)) lf = ( lf.with_columns( _ticker=pl.concat_str([pl.col("cid"), pl.lit("_"), pl.col("xcat")]) ) .filter(pl.col("_ticker").is_in(tickers_list)) .drop("_ticker") ) if start_date: start_date = pd_to_datetime_compat(start_date).strftime("%Y-%m-%d") lf = lf.filter(pl.col("real_date") >= pl.lit(start_date).str.to_date()) if end_date: end_date = pd_to_datetime_compat(end_date).strftime("%Y-%m-%d") lf = lf.filter(pl.col("real_date") <= pl.lit(end_date).str.to_date()) return lf def _to_output_schema( lf: pl.LazyFrame, src_kind: JPMaQSParquetSchemaKind, want_qdf: bool ) -> pl.LazyFrame: """Normalize columns to qdf or ticker-based shape.""" cols = "real_date.ticker.value.eop_lag.mop_lag.grading.last_updated" ticker_cols = cols.split(".") qdf_cols = cols.replace("ticker", "cid.xcat").split(".") if want_qdf: if src_kind is JPMaQSParquetSchemaKind.TICKER: cid_expr, xcat_expr = _expr_split_ticker(pl.col("ticker")) lf = lf.with_columns(cid=cid_expr, xcat=xcat_expr) lf = _ensure_columns(lf, qdf_cols) return lf.select(qdf_cols) if src_kind is JPMaQSParquetSchemaKind.QDF: lf = lf.with_columns( ticker=pl.concat_str([pl.col("cid"), pl.lit("_"), pl.col("xcat")]) ) lf = _ensure_columns(lf, ticker_cols) return lf.select(ticker_cols) def _scan_and_prepare_single_parquet( path: str, tickers: Sequence[str], start_date: Optional[Union[str, pd.Timestamp]], end_date: Optional[Union[str, pd.Timestamp]], return_qdf: bool, ) -> pl.LazyFrame: lf = pl.scan_parquet(path) kind = _identify_schema_type(lf) lf = _filter_lazy_frame_by_tickers( lf=lf, kind=kind, tickers=tickers, start_date=start_date, end_date=end_date, ) lf = _to_output_schema(lf, kind, return_qdf) return lf def _lazy_load_filtered_parquets( paths: List[str], tickers: List[str], start_date: Optional[Union[str, pd.Timestamp]], end_date: Optional[Union[str, pd.Timestamp]], return_qdf: bool = True, ) -> pl.LazyFrame: if not paths: raise ValueError("No paths provided") tickers_list: List[str] = list(dict.fromkeys(tickers)) lazy_parts: List[pl.LazyFrame] = [ _scan_and_prepare_single_parquet( path=p, tickers=tickers_list, start_date=start_date, end_date=end_date, return_qdf=return_qdf, ) for p in paths ] out = pl.concat(lazy_parts, how="vertical") return out if __name__ == "__main__": print("Current time UTC:", pd.Timestamp.utcnow().isoformat()) start = time.time() since_datetime = pd.Timestamp.now() - pd.offsets.BDay(5) print( f"Downloading full-snapshots, delta-files, and metadata files published since {since_datetime}" ) since_datetime = since_datetime.strftime("%Y%m%d") with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq: dq.download_catalog_file() dq.download_full_snapshot(since_datetime=since_datetime) end = time.time() print(f"Download completed in {end - start:.2f} seconds") cids = ["AUD", "BRL", "CAD", "CHF", "CNY", "CZK", "EUR", "GBP", "USD"] xcats = ["RIR_NSA", "FXXR_NSA", "FXXR_VT10", "DU05YXR_NSA", "DU05YXR_VT10"] tickers = [f"{c}_{x}" for c in cids for x in xcats] with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq: df = dq.download(tickers=tickers) print(df.head()) with DataQueryFileAPIClient(out_dir="./data/jpmaqs-data/") as dq: pl_df: pl.DataFrame = dq.download( cids=cids, xcats=xcats, dataframe_format="tickers", dataframe_type="polars", ) print(pl_df.head())