import requests
import json
import datetime
import time
import logging
import os
from pathlib import Path
import io
import warnings
import functools
import operator
import concurrent.futures as cf
from typing import Dict, Optional, TypeVar, Any, List, Union, Callable
import threading
import pandas as pd
import pyarrow as pa # noqa: F401
import pyarrow.dataset as pa_ds
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pyarrow.csv as pa_csv
from macrosynergy.management.types import QuantamentalDataFrame
from macrosynergy.download.exceptions import NoContentError
from macrosynergy.download.jpm_oauth import JPMorganOAuth
FUSION_AUTH_URL: str = "https://authe.jpmorgan.com/as/token.oauth2"
FUSION_ROOT_URL: str = "https://fusion.jpmorgan.com/api/v1"
FUSION_RESOURCE_ID: str = "JPMC:URI:RS-93742-Fusion-PROD"
FUSION_API_DELAY = 1.0 # seconds
LAST_API_CALL: Optional[datetime.datetime] = None
lock = threading.Lock()
LAST_API_CALL = None
CACHE_TTL = 60 # seconds
logger = logging.getLogger(__name__)
[docs]class FusionOAuth(JPMorganOAuth):
"""
A class to handle OAuth authentication for the JPMorgan Fusion API.
Uses the JPMorganOAuth class as a base.
"""
def __init__(
self,
client_id: str,
client_secret: str,
resource: str = FUSION_RESOURCE_ID,
application_name: str = "fusion",
root_url: str = FUSION_ROOT_URL,
auth_url: str = FUSION_AUTH_URL,
proxies: Optional[dict] = None,
):
super().__init__(
client_id=client_id,
client_secret=client_secret,
resource=resource,
application_name=application_name,
root_url=root_url,
auth_url=auth_url,
proxies=proxies,
)
CachedType = TypeVar("CachedType", bound=Callable[..., Any])
[docs]def cache_decorator(
ttl: int = 60, *, maxsize: Optional[int] = None
) -> Callable[[CachedType], CachedType]:
"""
Decorator to cache the result of a function for up to `ttl` seconds total.
Once any call happens at least `ttl` seconds after the last clear, the ENTIRE
cache is flushed before proceeding.
Parameters
----------
ttl : int
Time-to-live for the cache in seconds. After this time, the cache will be cleared.
Default is 60 seconds.
maxsize : Optional[int]
Maximum size of the cache. If None, the default size is used.
"""
def decorator(func: CachedType) -> CachedType:
# wrap the function itself in an LRU cache
cached_func = functools.lru_cache(maxsize=maxsize)(func)
last_clear = time.time()
@functools.wraps(func)
def wrapper(*args, **kwargs):
nonlocal last_clear
now = time.time()
# if TTL has expired, clear everything
if now - last_clear >= ttl:
cached_func.cache_clear()
last_clear = now
# call the cached version
return cached_func(*args, **kwargs)
# expose cache_clear for manual use if desired
wrapper.cache_clear = cached_func.cache_clear # type: ignore[attr-defined]
return wrapper # type: ignore[return-value]
return decorator
def _wait_for_api_call(api_delay: float = FUSION_API_DELAY) -> bool:
"""
Wait for the appropriate time before making an API call to avoid hitting the rate
limit. This function checks the time since the last API call and sleeps if necessary
to ensure that the next call is made after the defined delay (FUSION_API_DELAY).
Uses a global variable `LAST_API_CALL` to track the last call time.
"""
global LAST_API_CALL
with lock:
now = datetime.datetime.now()
if LAST_API_CALL is None:
LAST_API_CALL = now
return True
diff = (now - LAST_API_CALL).total_seconds()
sleep_for = api_delay - diff
if sleep_for > 0:
if sleep_for > 1:
logger.info(f"Sleeping for {sleep_for:.2f} seconds for API rate limit.")
time.sleep(sleep_for)
LAST_API_CALL = datetime.datetime.now()
return True
[docs]def request_wrapper(
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
json_payload: Optional[Dict[str, Any]] = None,
proxies: Optional[Dict[str, str]] = None,
as_json: Optional[bool] = None,
as_bytes: Optional[bool] = None,
as_text: Optional[bool] = None,
api_delay: float = FUSION_API_DELAY,
timeout: Optional[float] = None,
verify_ssl: bool = True,
) -> Union[Dict[str, Any], str, bytes]:
"""
A wrapper function for making API requests to the JPMorgan Fusion API.
"""
if not isinstance(method, str):
raise TypeError("Method must be a string.")
if method not in ["GET", "POST", "PUT", "DELETE"]:
raise ValueError(
f"Invalid method: {method}. Must be one of 'GET', 'POST', 'PUT', 'DELETE'."
)
as_flags = [as_bytes, as_text, as_json]
check_flags = sum(map(bool, as_flags))
if check_flags > 1:
raise ValueError("Only one of `as_json`, `as_bytes`, or `as_text` can be True.")
if not check_flags:
as_json = True
raw_response: Optional[requests.Response] = None
try:
_wait_for_api_call(api_delay=api_delay)
response = requests.request(
method=method.upper(),
url=url,
headers=headers,
params=params,
data=data,
json=json_payload,
proxies=proxies,
timeout=timeout,
verify=verify_ssl,
)
raw_response = response
response.raise_for_status()
if response.status_code == 204 or not response.content:
raise NoContentError(
f"No content returned for {method} {url}. Response status code: {response.status_code}"
)
if as_bytes:
return response.content
if as_text:
return response.text
return response.json()
except requests.exceptions.HTTPError as e_http:
actual_method: str = (
e_http.request.method
if hasattr(e_http, "request") and e_http.request
else method
)
actual_url: str = (
e_http.response.url
if hasattr(e_http, "response") and e_http.response
else url
)
error_details = f"API HTTP error for {actual_method} {actual_url}: {e_http}"
error_details += f"\nTimestamp (UTC): {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
if hasattr(e_http, "response") and e_http.response is not None:
error_details += f"\nStatus Code: {e_http.response.status_code}\nResponse: {e_http.response.text[:500]}"
raise Exception(error_details) from e_http
except requests.exceptions.RequestException as e_req:
error_details = f"API request failed for {method} {url}: {e_req}"
error_details += f"\nTimestamp (UTC): {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
if hasattr(e_req, "response") and e_req.response is not None:
error_details += f"\nStatus Code: {e_req.response.status_code}\nResponse: {e_req.response.text[:500]}"
raise Exception(error_details) from e_req
except json.JSONDecodeError as e_json:
error_details = f"Failed to decode JSON response from {method} {url}: {e_json}"
error_details += f"\nTimestamp (UTC): {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
if raw_response:
error_details += f"\nResponse text: {raw_response.text[:500]}"
raise Exception(error_details) from e_json
[docs]def request_wrapper_stream_bytes_to_disk(
filename: str,
url: str,
method: str = "GET",
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
json_payload: Optional[Dict[str, Any]] = None,
proxies: Optional[Dict[str, str]] = None,
chunk_size: int = None,
api_delay: float = FUSION_API_DELAY,
timeout: Optional[float] = None,
verify_ssl: bool = True,
) -> None:
"""
Stream a request's response bytes directly to disk, chunk by chunk.
Parameters
----------
filename : str
The file path to write the streamed bytes to.
url : str
The URL to request.
method : str
HTTP method. Only GET is allowed for streaming to disk.
headers : dict, optional
HTTP headers.
params : dict, optional
Query parameters.
data : any, optional
Data to send in the body.
json_payload : dict, optional
JSON data to send in the body.
proxies : dict, optional
Proxies to use for the request.
chunk_size : int
Size of each chunk to write (default 8192).
api_delay : float
Delay between API calls (defaults to 1.0 seconds).
timeout : float, optional
Timeout for the request (defaults to None).
verify_ssl : bool
Whether to verify SSL certificates (defaults to True).
"""
if not isinstance(method, str):
raise TypeError("Method must be a string.")
if method.upper() != "GET":
raise ValueError(
f"Invalid method: {method}. Must be 'GET' for streaming to disk."
)
_wait_for_api_call(api_delay=api_delay)
with requests.request(
method=method.upper(),
url=url,
headers=headers,
params=params,
data=data,
json=json_payload,
proxies=proxies,
stream=True,
timeout=timeout,
verify=verify_ssl,
) as response:
response.raise_for_status()
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
[docs]class SimpleFusionAPIClient:
def __init__(
self,
oauth_handler: FusionOAuth,
base_url: str = FUSION_ROOT_URL,
proxies: Optional[Dict[str, str]] = None,
):
if not isinstance(oauth_handler, FusionOAuth):
raise TypeError("oauth_handler must be an instance of FusionOAuth.")
self.oauth_handler: FusionOAuth = oauth_handler
self.base_url: str = base_url.rstrip("/")
if proxies is not None:
if proxies != self.oauth_handler.proxies:
proxy_warning = "Proxies defined for OAuth handler are different from the ones defined for the downloader."
warnings.warn(proxy_warning)
self.proxies: Optional[Dict[str, str]] = proxies
else:
self.proxies: Optional[Dict[str, str]] = None
def _request(
self,
endpoint: str,
method: str = "GET",
params: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
json_payload: Optional[Dict[str, Any]] = None,
as_json: Optional[bool] = None,
as_bytes: Optional[bool] = None,
as_text: Optional[bool] = None,
timestamp: Optional[Any] = None,
**kwargs,
) -> Optional[Dict[str, Any]]:
url: str = f"{self.base_url}/{endpoint.lstrip('/')}"
headers: Dict[str, str] = self.oauth_handler.get_headers()
if timestamp:
# timestamp is solely for cache busting purposes
pass # pragma: no cover
return request_wrapper(
method=method,
url=url,
headers=headers,
params=params,
data=data,
json_payload=json_payload,
proxies=self.proxies,
as_json=as_json,
as_bytes=as_bytes,
as_text=as_text,
**kwargs,
)
[docs] @cache_decorator(CACHE_TTL)
def get_common_catalog(self, **kwargs) -> Dict[str, Any]:
"""
Get the common catalog from the JPMorgan Fusion API.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/common" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Returns
-------
Dict[str, Any]
API response containing the common catalog.
"""
# /v1/catalogs/common
endpoint: str = "catalogs/common"
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] @cache_decorator(CACHE_TTL)
def get_products(self, **kwargs) -> Dict[str, Any]:
"""
Get the list of products available in the JPMorgan Fusion API.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/common/products" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Returns
-------
Dict[str, Any]
API response containing the list of products.
"""
# /v1/catalogs/common/products
endpoint: str = "catalogs/common/products"
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] @cache_decorator(CACHE_TTL)
def get_product_details(
self, product_id: str = "JPMAQS", **kwargs
) -> Dict[str, Any]:
"""
Get the details of a specific product by its ID.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/common/products/{product_id}" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Parameters
----------
product_id : str
The ID of the product to retrieve details for. Default is "JPMAQS".
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
Dict[str, Any]
API response containing the product details.
"""
# /v1/catalogs/common/products/{product_id}
endpoint: str = f"catalogs/common/products/{product_id}"
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] @cache_decorator(CACHE_TTL)
def get_dataset(self, catalog: str, dataset: str, **kwargs) -> Dict[str, Any]:
"""
Get the details of a specific dataset from a specified catalog.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/{catalog}/datasets/{dataset}" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Parameters
----------
catalog : str
The catalog from which to retrieve the dataset.
dataset : str
The identifier of the dataset to retrieve.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
Dict[str, Any]
API response containing the dataset details.
"""
# /v1/catalogs/{catalog}/datasets/{dataset}
endpoint: str = f"catalogs/{catalog}/datasets/{dataset}"
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] @cache_decorator(CACHE_TTL)
def get_dataset_series(
self, catalog: str, dataset: str, **kwargs
) -> Dict[str, Any]:
"""
Get the series available for a specific dataset in a specified catalog.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/{catalog}/datasets/{dataset}/datasetseries" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Parameters
----------
catalog : str
The catalog from which to retrieve the dataset series.
dataset : str
The identifier of the dataset for which to retrieve series.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
Dict[str, Any]
API response containing the dataset series details.
"""
# /v1/catalogs/{catalog}/datasets/{dataset}/datasetseries
endpoint: str = f"catalogs/{catalog}/datasets/{dataset}/datasetseries"
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] @cache_decorator(CACHE_TTL)
def get_dataset_seriesmember(
self, catalog: str, dataset: str, seriesmember: str, **kwargs
) -> Dict[str, Any]:
"""
Get the details of a specific series member in a dataset from a specified catalog.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Parameters
----------
catalog : str
The catalog from which to retrieve the series member.
dataset : str
The identifier of the dataset containing the series member.
seriesmember : str
The identifier of the series member to retrieve.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
Dict[str, Any]
API response containing the details of the specified series member.
"""
# /v1/catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}
endpoint: str = (
f"catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}"
)
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] @cache_decorator(CACHE_TTL)
def get_seriesmember_distributions(
self, catalog: str, dataset: str, seriesmember: str, **kwargs
) -> Dict[str, Any]:
"""
Get the distributions available for a specific series member in a dataset from a
specified catalog.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Parameters
----------
catalog : str
The catalog from which to retrieve the series member distributions.
dataset : str
The identifier of the dataset containing the series member.
seriesmember : str
The identifier of the series member for which to retrieve distributions.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
Dict[str, Any]
API response containing the available distributions for the specified series
member.
"""
# /v1/catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions
endpoint: str = f"catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions"
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] def get_seriesmember_distribution_details(
self, catalog: str, dataset: str, seriesmember: str, distribution: str, **kwargs
) -> Union[Dict[str, Any], bytes, str]:
"""
Get the details of a specific distribution for a series member in a dataset from
a specified catalog.
Equivalent cURL request:
.. code-block:: bash
curl -X GET "https://fusion.jpmorgan.com/api/v1/catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions/{distribution}" \\
-H "Authorization: Bearer <ACCESS_TOKEN>"
Parameters
----------
catalog : str
The catalog from which to retrieve the series member distribution.
dataset : str
The identifier of the dataset containing the series member.
seriesmember : str
The identifier of the series member for which to retrieve the distribution.
distribution : str
The identifier of the distribution to retrieve (e.g., "parquet").
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
Union[Dict[str, Any], bytes, str]
API response containing the distribution details (the actual data) for the
specified series member.
"""
# /v1/catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions/{distribution}
endpoint: str = f"catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions/{distribution}"
return self._request(method="GET", endpoint=endpoint, **kwargs)
[docs] def get_seriesmember_distribution_details_to_disk(
self,
filename: str,
catalog: str,
dataset: str,
seriesmember: str,
distribution: str = "parquet",
**kwargs,
) -> None:
"""
Download the distribution for a specific series member in a dataset from a
specified catalog and save it to disk.
Parameters
----------
filename : str
The file path to save the downloaded distribution data.
catalog : str
The catalog from which to retrieve the series member distribution.
dataset : str
The identifier of the dataset containing the series member.
seriesmember : str
The identifier of the series member for which to download the distribution.
distribution : str
The identifier of the distribution to download (e.g., "parquet").
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
None
The downloaded data is saved directly to the specified file.
"""
# /v1/catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions/{distribution}
endpoint: str = f"catalogs/{catalog}/datasets/{dataset}/datasetseries/{seriesmember}/distributions/{distribution}"
headers: Dict[str, str] = self.oauth_handler.get_headers()
request_wrapper_stream_bytes_to_disk(
filename=filename,
headers=headers,
url=f"{self.base_url}/{endpoint}",
method="GET",
**kwargs,
)
[docs]def get_resources_df(
response_dict: Dict[str, Any],
resources_key: str = "resources",
keep_fields: Optional[List[str]] = None,
custom_sort_columns: bool = True,
) -> pd.DataFrame:
"""
Extracts the 'resources' field from a response dictionary and returns it as a
DataFrame.
Parameters
----------
response_dict : Dict[str, Any]
The response dictionary containing the 'resources' field.
resources_key : str
The key in the response dictionary that contains the resources data.
Default is 'resources'.
keep_fields : Optional[List[str]]
A list of fields to keep in the DataFrame. If None, all fields are kept.
custom_sort_columns : bool
If True, the DataFrame will be sorted with specific columns first.
Default is True.
Returns
-------
pd.DataFrame
A DataFrame containing the resources data.
"""
if resources_key not in response_dict:
raise ValueError(
f"Field '{resources_key}' not found in the response dictionary."
)
resources_df: pd.DataFrame = pd.DataFrame(response_dict[resources_key])
if keep_fields is not None:
resources_df = resources_df[keep_fields]
if "@id" not in resources_df.columns:
raise ValueError("Column '@id' not found in the resources DataFrame.")
if custom_sort_columns:
_c = ["@id", "identifier", "title"]
if "title" not in resources_df.columns:
_c.remove("title")
msg = f"{_c} must be in the DataFrame columns for custom_sort_columns=True"
assert all(x in resources_df.columns for x in _c), msg
new_cols = _c + sorted(filter(lambda x: x not in _c, resources_df.columns))
resources_df = resources_df[new_cols]
return resources_df
[docs]def convert_ticker_based_pandas_df_to_qdf(
df: pd.DataFrame, categorical: bool = True
) -> pd.DataFrame:
"""
Convert Parquet DataFrame with ticker entries to a QDF with cid & xcat columns.
Parameters
----------
df : pd.DataFrame
The DataFrame to convert, which should contain a 'ticker' column.
categorical : bool
If True, converts the DataFrame to a QuantamentalDataFrame with categorical data.
"""
df[["cid", "xcat"]] = df["ticker"].str.split("_", n=1, expand=True)
df = df.drop(columns=["ticker"])
df = QuantamentalDataFrame(df, categorical=categorical)
return df
[docs]def convert_ticker_based_parquet_file_to_qdf(
filename: str,
compression: str = "zstd",
as_csv: bool = False,
qdf: bool = False,
keep_raw_data: bool = False,
) -> None:
"""
Convert a Parquet file with ticker entries to a QDF or CSV format.
This function reads a Parquet file, extracts the 'ticker' column, splits it into
'cid' and 'xcat', and writes the result to a new Parquet file or CSV file.
Parameters
----------
filename : str
The path to the Parquet file to convert. The file must exist.
compression : str
The compression algorithm to use for the output Parquet file. Default is 'zstd'.
as_csv : bool
If True, the output will be saved as a CSV file instead of a Parquet file.
Default is False.
qdf : bool
If True, the output will be saved as a Quantamental DataFrame, in parquet or
CSV format depending on the `as_csv` parameter. Default is False.
keep_raw_data : bool
If True, the original Parquet file will not be deleted after conversion.
If False, the original file will be removed after conversion. Default is False.
"""
# Ensure source exists
if not os.path.isfile(filename):
raise FileNotFoundError(f"No such file: {filename}")
base, ext = os.path.splitext(filename)
dirpath = os.path.dirname(filename)
# quick dump of raw data to csv - if csv and not qdf
if as_csv and not qdf:
dataset = pa_ds.dataset(filename, format="parquet")
scanner = dataset.scanner()
out_csv = base + ".csv"
with pa_csv.CSVWriter(out_csv, schema=scanner.dataset_schema) as writer:
for batch in scanner.to_batches():
writer.write(batch)
if not keep_raw_data:
os.remove(filename)
return
# return - nothing todo
if not qdf:
return
# setup pa scanner for lazy loading
dataset = pa_ds.dataset(filename, format="parquet")
split = pc.split_pattern(pc.field("ticker"), "_", max_splits=1)
cols = {
"real_date": pc.field("real_date"),
"value": pc.field("value"),
"grading": pc.field("grading"),
"eop_lag": pc.field("eop_lag"),
"mop_lag": pc.field("mop_lag"),
"last_updated": pc.field("last_updated"),
"cid": pc.list_element(split, 0),
"xcat": pc.list_element(split, 1),
}
scanner = dataset.scanner(columns=cols)
# set output extension and path
out_ext = ".csv" if as_csv else ".parquet"
if keep_raw_data:
out_path = os.path.join(dirpath, os.path.basename(base) + "_qdf" + out_ext)
else:
# overwrite for parquet, or replace for csv
out_path = filename if not as_csv else base + ".csv"
if as_csv:
schema = scanner.projected_schema
with pa_csv.CSVWriter(out_path, schema=schema) as writer:
for batch in scanner.to_batches():
writer.write(batch)
else:
pq.write_table(scanner.to_table(), out_path, compression=compression)
if qdf and as_csv and not keep_raw_data:
os.remove(filename)
[docs]def convert_ticker_based_pyarrow_table_to_qdf(table: pa.Table) -> pa.Table:
"""
Convert a PyArrow Table with ticker entries to a Quantamental DataFrame (QDF)
with 'cid' and 'xcat' columns, splitting on '_' lazily via a Scanner.
Parameters
----------
table : pa.Table
The PyArrow Table to convert, which should contain a 'ticker' column.
Returns
-------
pa.Table
A PyArrow Table with all original columns except 'ticker',
plus new 'cid' and 'xcat' (string) columns.
The split only happens when you call to_table().
"""
if "ticker" not in table.schema.names:
raise KeyError("Column 'ticker' not found in the table.")
dataset = pa_ds.dataset(table)
split = pc.split_pattern(pc.field("ticker"), "_", max_splits=1)
cols = {
"real_date": pc.field("real_date"),
"value": pc.field("value"),
"grading": pc.field("grading"),
"eop_lag": pc.field("eop_lag"),
"mop_lag": pc.field("mop_lag"),
"last_updated": pc.field("last_updated"),
"cid": pc.list_element(split, 0),
"xcat": pc.list_element(split, 1),
}
scanner = dataset.scanner(columns=cols)
return scanner.to_table()
[docs]def read_parquet_from_bytes_to_pandas_dataframe(response_bytes: bytes) -> pd.DataFrame:
"""
Read a Parquet file from bytes and return a DataFrame.
This function is used to read Parquet files downloaded from the JPMaQS Fusion API.
Parameters
----------
response_bytes : bytes
The bytes of the Parquet file to read.
Returns
--------
pd.DataFrame
A DataFrame containing the data from the Parquet file.
"""
try:
return pd.read_parquet(io.BytesIO(response_bytes))
except KeyboardInterrupt:
raise
except Exception as e:
raise ValueError(f"Failed to read Parquet from bytes: {e}") from e
[docs]def read_parquet_from_bytes_to_pyarrow_table(
response_bytes: bytes, **kwargs
) -> pa.Table:
"""
Read a Parquet file from bytes and return a PyArrow Table.
This function is used to read Parquet files downloaded from the JPMaQS Fusion API.
Parameters
----------
response_bytes : bytes
The bytes of the Parquet file to read.
**kwargs : dict
Additional keyword arguments to pass to `pyarrow.parquet.read_table`.
Returns
-------
pa.Table
A PyArrow Table containing the data from the Parquet file.
"""
try:
return pa.parquet.read_table(io.BytesIO(response_bytes), **kwargs)
except KeyboardInterrupt:
raise
except Exception as e:
raise ValueError(f"Failed to read Parquet to PyArrow Table: {e}") from e
[docs]def coerce_real_date(table: pa.Table) -> pa.Table:
idx = table.schema.get_field_index("real_date")
col = table.column(idx)
t = col.type
if pa.types.is_date32(t):
# Already correct type
return table
elif pa.types.is_timestamp(t):
# Fast direct cast
dates = pc.cast(col, pa.date32())
elif pa.types.is_string(t):
# Trim to YYYY-MM-DD to handle datetime strings
col = pc.utf8_slice_codeunits(col, 0, 10)
ts = pc.strptime(col, format="%Y-%m-%d", unit="s")
dates = pc.cast(ts, pa.date32())
else:
raise TypeError(f"Unsupported type for real_date: {t}")
return table.set_column(idx, "real_date", dates)
[docs]def filter_parquet_table_as_qdf(
table: pa.Table,
tickers: List[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
qdf: bool = False,
) -> pa.Table:
"""
Filter a PyArrow Table based on tickers and date range. Optionally converts the
table from a ticker-based format to a Quantamental DataFrame (QDF).
Parameters
----------
table : pa.Table
The PyArrow Table to filter.
tickers : List[str], optional
A list of tickers to filter by. If None, no ticker filtering is applied.
start_date : Optional[str], optional
The start date for filtering in ISO format (YYYY-MM-DD). If None, no start date
filtering is applied.
end_date : Optional[str], optional
The end date for filtering in ISO format (YYYY-MM-DD). If None, no end date
filtering is applied.
qdf : bool, optional
If True, converts the filtered table to a Quantamental DataFrame (QDF) format.
Default is False.
Returns
-------
pa.Table
A filtered PyArrow Table. If `qdf` is True, the table is converted to a QDF.
"""
if not isinstance(table, pa.Table):
raise TypeError("Input must be a PyArrow Table.")
table = coerce_real_date(table)
if not any([tickers, start_date, end_date]) and not qdf:
return table
if bool(start_date) and bool(end_date):
if pd.Timestamp(start_date) > pd.Timestamp(end_date):
start_date, end_date = end_date, start_date
ticker_col = "ticker"
exprs = []
if tickers:
if ticker_col not in table.schema.names:
raise KeyError(f"No column named '{ticker_col}' in table")
table.column(ticker_col).type
tickers_array = pa.array(tickers, type=pa.string())
exprs.append(pc.is_in(pc.field("ticker"), value_set=tickers_array))
if start_date:
dt = datetime.date.fromisoformat(start_date)
scalar = pa.scalar(dt, type=pa.date32())
exprs.append(pc.greater_equal(pc.field("real_date"), scalar))
if end_date:
dt = datetime.date.fromisoformat(end_date)
scalar = pa.scalar(dt, type=pa.date32())
exprs.append(pc.less_equal(pc.field("real_date"), scalar))
expression = functools.reduce(operator.and_, exprs)
table = table.filter(expression)
if qdf:
table = convert_ticker_based_pyarrow_table_to_qdf(table)
return table
[docs]class JPMaQSFusionClient:
"""
A client for accessing the JPMaQS product on the JPMorgan Fusion API.
This client is specific to the JPMaQS product and provides methods to fetch the data
catalog, list datasets, and download distributions.
It uses :func:`SimpleFusionAPIClient` to handle the API requests.
Parameters
----------
oauth_handler : FusionOAuth
An instance of FusionOAuth to handle OAuth authentication.
base_url : str
The base URL for the Fusion API. Default is FUSION_ROOT_URL.
proxies : Optional[Dict[str, str]]
Optional proxies to use for the HTTP requests. Default is None.
"""
def __init__(
self,
oauth_handler: FusionOAuth,
base_url: str = FUSION_ROOT_URL,
proxies: Optional[Dict[str, str]] = None,
):
self._catalog = "common"
self._product_id = "JPMAQS"
self._catalog_dataset = "JPMAQS_METADATA_CATALOG"
self._notifications_dataset = "JPMAQS_METADATA_NOTIFICATIONS"
self.simple_fusion_client = SimpleFusionAPIClient(
oauth_handler=oauth_handler, base_url=base_url, proxies=proxies
)
self.failure_messages: List[str] = []
self.metadata_datesets = [
self._catalog_dataset,
self._notifications_dataset,
]
[docs] def list_datasets(
self,
product_id: str = "JPMAQS",
fields: List[str] = ["@id", "identifier", "title", "description"],
include_catalog: bool = False,
include_notifications: bool = False,
include_full_datasets: bool = True,
include_explorer_datasets: bool = False,
include_delta_datasets: bool = False,
**kwargs,
) -> pd.DataFrame:
"""
List datasets available in the JPMaQS product. Returns a DataFrame with the
specified fields. This excludes the metadata catalog and the Explorer datasets
by default.
Parameters
----------
product_id : str
The product ID to filter datasets by. Default is "JPMAQS".
fields : List[str]
List of fields to include in the returned DataFrame.
include_catalog : bool
If True, includes the metadata catalog dataset in the results.
include_notifications : bool
If True, includes the notifications dataset in the results.
include_explorer_datasets : bool
If True, includes the Explorer datasets in the results.
include_delta_datasets : bool
If True, includes the Delta datasets in the results.
Returns
-------
pd.DataFrame
A DataFrame containing information about the available datasets.
"""
if not (
include_catalog
or include_notifications
or include_full_datasets
or include_explorer_datasets
or include_delta_datasets
):
raise ValueError(
"At least one of `include_catalog`, `include_notifications`, "
"`include_full_datasets`, `include_explorer_datasets`, or "
"`include_delta_datasets` must be True."
)
r = self.simple_fusion_client.get_product_details(
product_id=product_id, **kwargs
)
resources_df: pd.DataFrame = get_resources_df(r, keep_fields=None)
resources_df = resources_df.sort_values(by=["isRestricted", "@id"])
if not include_catalog:
resources_df = resources_df[
resources_df["identifier"] != self._catalog_dataset
]
if not include_notifications:
resources_df = resources_df[
resources_df["identifier"] != self._notifications_dataset
]
if not include_explorer_datasets:
sel_bools = resources_df["identifier"].str.startswith("JPMAQS_EXPLORER_")
if all(sel_bools):
warnings.warn(
"`include_explorer_datasets` is True, but all datasets are Explorer datasets. Setting it to False."
)
resources_df = resources_df[~sel_bools]
if not include_delta_datasets:
sel_bools = resources_df["identifier"].str.startswith("JPMAQS_DELTA_")
resources_df = resources_df[~sel_bools]
if not include_full_datasets:
delta_datasets = resources_df[
resources_df["identifier"].str.startswith("JPMAQS_DELTA_")
]
explorer_datasets = resources_df[
resources_df["identifier"].str.startswith("JPMAQS_EXPLORER_")
]
other_ds_ids = set(delta_datasets["identifier"]) | set(
explorer_datasets["identifier"]
)
full_datasets = resources_df[resources_df["identifier"].isin(other_ds_ids)]
resources_df = full_datasets
resources_df = resources_df[fields].reset_index(drop=True)
resources_df.index = resources_df.index + 1
return resources_df
[docs] @cache_decorator(CACHE_TTL)
def get_notifications_distribution(
self, series_member: Optional[str] = None, **kwargs
) -> pd.DataFrame:
"""
Get the notifications distribution for JPMaQS. This dataset contains notifications
around updating and refresh times of various series in the JPMaQS product.
Parameters
----------
series_member : Optional[str]
The series member identifier for which to retrieve the distribution.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the notifications distribution details.
"""
if series_member is None:
series_member = self.get_latest_seriesmember_identifier(
dataset=self._notifications_dataset, **kwargs
)
r_json = self.simple_fusion_client.get_seriesmember_distribution_details(
catalog=self._catalog,
dataset=self._notifications_dataset,
seriesmember=series_member,
distribution="json",
as_json=True,
**kwargs,
)
# handle timestamp issue
timestamp_str = r_json["metadata"]["datetime"]
dt_fmt = "%Y-%d-%mT%H%M%S"
timestamp = pd.to_datetime(timestamp_str, format=dt_fmt, errors="raise")
df = pd.DataFrame(r_json["data"])
df["timestamp"] = timestamp
# explode report fields
df["cross_section"] = df["cross_section"].str.replace(" ", "").str.split(",")
df["category"] = df["category"].str.replace(" ", "").str.split(",")
df = df.explode("cross_section").explode("category").reset_index(drop=True)
df = df.sort_values(by=["category", "cross_section", "comment"])
df = df.reset_index(drop=True)
return df
[docs] def list_tickers(self, **kwargs) -> List[str]:
"""
List all tickers available in the JPMaQS product. This method retrieves the
metadata catalog and extracts the tickers from it.
Parameters
----------
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the tickers and their metadata.
"""
metadata_catalog = self.get_metadata_catalog(**kwargs)
if "Ticker" not in metadata_catalog.columns:
raise ValueError("Invalid metadata catalog: 'Ticker' column not found.")
return sorted(metadata_catalog["Ticker"])
[docs] def get_dataset_available_series(self, dataset: str, **kwargs) -> pd.DataFrame:
"""
Get the available series for a given dataset in the JPMaQS product. Typically,
each JPMaQS dataset will have one series for all business days (the JPMaQS release
for that dataset for that day).
Parameters
----------
dataset : str
The dataset identifier for which to retrieve series.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the available series for the specified dataset.
"""
result = self.simple_fusion_client.get_dataset_series(
catalog=self._catalog, dataset=dataset, **kwargs
)
cols = ["@id", "identifier", "createdDate", "fromDate", "toDate"]
metadata_cols = {
self._catalog_dataset: ["@id", "identifier"],
self._notifications_dataset: ["@id", "identifier", "createdDate"],
}
if dataset in metadata_cols:
cols = metadata_cols[dataset]
result = get_resources_df(result, keep_fields=cols)
return result
[docs] def get_seriesmember_distributions(
self, dataset: str, seriesmember: str, **kwargs
) -> pd.DataFrame:
"""
Get the available distributions for a given series member in a dataset.
Parameters
----------
dataset : str
The dataset identifier for which to retrieve series member distributions.
seriesmember : str
The series member identifier for which to retrieve distributions.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the available distributions for the specified series
member.
"""
result = self.simple_fusion_client.get_seriesmember_distributions(
catalog=self._catalog, dataset=dataset, seriesmember=seriesmember, **kwargs
)
result = get_resources_df(result)
return result
[docs] def get_all_seriesmembers_for_all_datasets(
self,
include_catalog: bool = False,
include_notifications: bool = False,
include_full_datasets: bool = True,
include_explorer_datasets: bool = False,
include_delta_datasets: bool = False,
**kwargs,
) -> pd.DataFrame:
"""
Get all series members for all datasets in the JPMaQS product.
Parameters
----------
include_catalog : bool
If True, includes the metadata catalog dataset in the snapshot. Default is
False.
include_notifications : bool
If True, includes notifications dataset in the snapshot. Default is False.
include_explorer_datasets : bool
If True, includes Explorer datasets in the snapshot. Default is False.
include_delta_datasets : bool
If True, includes Delta datasets in the snapshot. Default is False.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing all series members for all datasets.
"""
datasets = self.list_datasets(
include_catalog=include_catalog,
include_notifications=include_notifications,
include_full_datasets=include_full_datasets,
include_explorer_datasets=include_explorer_datasets,
include_delta_datasets=include_delta_datasets,
**kwargs,
)
all_seriesmembers: List[pd.DataFrame] = []
futures: Dict[cf.Future, str] = {}
with cf.ThreadPoolExecutor() as executor:
for dataset in datasets["identifier"].tolist():
future = executor.submit(
self.get_dataset_available_series, dataset, **kwargs
)
futures[future] = dataset
for future in cf.as_completed(futures):
dataset = futures[future]
try:
result: pd.DataFrame = future.result()
if not result.empty:
result["dataset"] = dataset
all_seriesmembers.append(result)
except Exception as e:
print(f"Error retrieving series members for dataset {dataset}: {e}")
if not all_seriesmembers:
return pd.DataFrame()
return pd.concat(all_seriesmembers, ignore_index=True)
[docs] def download_series_member_distribution(
self,
dataset: str,
seriesmember: str,
distribution: str = "parquet",
**kwargs,
) -> pd.DataFrame:
"""
Download the distribution for a given series member in a dataset.
Parameters
----------
dataset : str
The dataset identifier for which to download the series member distribution.
seriesmember : str
The series member identifier for which to download the distribution.
distribution : str
The distribution format to download. Default is "parquet".
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the distribution data for the specified series member.
"""
result = self.simple_fusion_client.get_seriesmember_distribution_details(
catalog=self._catalog,
dataset=dataset,
seriesmember=seriesmember,
distribution=distribution,
as_bytes=True,
**kwargs,
)
result = read_parquet_from_bytes_to_pandas_dataframe(result)
return result
[docs] def download_series_member_distribution_to_disk(
self,
save_directory: str,
dataset: str,
seriesmember: str,
distribution: str = "parquet",
qdf: bool = False,
as_csv: bool = False,
keep_raw_data: bool = False,
**kwargs,
) -> None:
os.makedirs(save_directory, exist_ok=True)
is_catalog_dataset = dataset in self.metadata_datesets
filename = os.path.join(
save_directory, f"{dataset}-{seriesmember}.{distribution}"
)
self.simple_fusion_client.get_seriesmember_distribution_details_to_disk(
filename=filename,
catalog=self._catalog,
dataset=dataset,
seriesmember=seriesmember,
distribution=distribution,
**kwargs,
)
ftype = "catalog" if is_catalog_dataset else "series member"
if not os.path.exists(filename):
raise FileNotFoundError(
f"Failed to download {ftype} distribution to {filename}."
)
else:
print(f"Successfully downloaded {ftype} distribution to {filename}.")
if is_catalog_dataset:
return
convert_ticker_based_parquet_file_to_qdf(
filename=filename,
as_csv=as_csv,
qdf=qdf,
keep_raw_data=keep_raw_data,
)
if qdf:
msg_str = (
f"Successfully converted {filename} to Quantamental Data Format (QDF)"
)
if as_csv:
msg_str += " and saved as CSV"
print(msg_str)
[docs] def get_latest_seriesmember_identifier(
self,
dataset: str,
**kwargs,
) -> str:
"""
Get the latest distribution identifier for a given dataset in the JPMaQS product.
Parameters
----------
dataset : str
The dataset identifier for which to get the latest distribution.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
str
The identifier of the latest distribution for the specified dataset.
"""
series_members = self.get_dataset_available_series(dataset=dataset, **kwargs)
if series_members.empty:
raise ValueError(f"No series members found for dataset '{dataset}'.")
latest_series_member = sorted(series_members["identifier"].tolist())[-1]
return latest_series_member
[docs] def download_latest_distribution(
self,
dataset: str,
distribution: str = "parquet",
qdf: bool = True,
categorical: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Download the latest distribution for a given dataset in the JPMaQS product.
Parameters
----------
dataset : str
The dataset identifier for which to download the latest distribution.
distribution : str
The distribution format to download. Default is "parquet".
qdf : bool
If True, converts the DataFrame to a QuantamentalDataFrame.
categorical : bool
If True, converts the DataFrame to a QuantamentalDataFrame with categorical
data.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the latest distribution for the specified dataset.
"""
latest_series_member = self.get_latest_seriesmember_identifier(
dataset=dataset,
**kwargs,
)
dist_df = self.download_series_member_distribution(
dataset=dataset,
seriesmember=latest_series_member,
distribution=distribution,
**kwargs,
)
if qdf:
dist_df = convert_ticker_based_pandas_df_to_qdf(
df=dist_df,
categorical=categorical,
)
return dist_df
[docs] def download_and_filter_series_member_distribution(
self,
dataset: str,
seriesmember: str,
tickers: List[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
qdf: bool = False,
distribution: str = "parquet",
**kwargs,
) -> pd.DataFrame:
"""
Download and filter the distribution for a given series member in a dataset.
Parameters
----------
dataset : str
The dataset identifier for which to download the series member distribution.
seriesmember : str
The series member identifier for which to download the distribution.
tickers : List[str]
A list of tickers to filter the distribution by. If None, no filtering is done.
start_date : Optional[str]
The start date to filter the distribution by (in ISO format). If None, no
filtering is done.
end_date : Optional[str]
The end date to filter the distribution by (in ISO format). If None, no
filtering is done.
qdf : bool
If True, converts the DataFrame to a QuantamentalDataFrame. Default is False.
distribution : str
The distribution format to download. Default is "parquet".
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the filtered distribution for the specified series member.
"""
result = self.simple_fusion_client.get_seriesmember_distribution_details(
catalog=self._catalog,
dataset=dataset,
seriesmember=seriesmember,
distribution=distribution,
as_bytes=True,
**kwargs,
)
result = read_parquet_from_bytes_to_pyarrow_table(result)
result = filter_parquet_table_as_qdf(
table=result,
tickers=tickers,
start_date=start_date,
end_date=end_date,
qdf=qdf,
)
return result.to_pandas()
[docs] def download_latest_distribution_to_disk(
self,
save_directory: str,
dataset: str,
distribution: str = "parquet",
qdf: bool = False,
as_csv: bool = False,
keep_raw_data: bool = False,
**kwargs,
) -> None:
latest_series_member = self.get_latest_seriesmember_identifier(
dataset=dataset, **kwargs
)
self.download_series_member_distribution_to_disk(
save_directory=save_directory,
dataset=dataset,
seriesmember=latest_series_member,
distribution=distribution,
qdf=qdf,
as_csv=as_csv,
keep_raw_data=keep_raw_data,
**kwargs,
)
def _download_multiple_distributions_to_disk(
self,
folder: str = None,
qdf: bool = False,
include_catalog: bool = False,
include_notifications: bool = False,
include_full_datasets: bool = False,
include_explorer_datasets: bool = False,
include_delta_datasets: bool = False,
as_csv: bool = False,
keep_raw_data: bool = False,
datasets_list: Optional[List[str]] = None,
) -> pd.DataFrame:
if folder is None:
folder = Path.cwd()
folder: Path = Path(folder).expanduser()
timestamp = pd.Timestamp.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
folder = folder / f"jpmaqs-download-{timestamp}"
os.makedirs(folder, exist_ok=True)
catalog_df = self.get_metadata_catalog()
metadata_catalog_path = os.path.join(folder, "jpmaqs-metadata-catalog")
if as_csv:
catalog_df.to_csv(f"{metadata_catalog_path}.csv", index=False)
else:
catalog_df.to_parquet(f"{metadata_catalog_path}.parquet", index=False)
datasets: List[str] = self.list_datasets(
include_catalog=include_catalog,
include_notifications=include_notifications,
include_full_datasets=include_full_datasets,
include_explorer_datasets=include_explorer_datasets,
include_delta_datasets=include_delta_datasets,
)["identifier"].tolist()
if datasets_list is not None:
ds_lower = [ds.lower() for ds in datasets]
avail_ds = [ds for ds in datasets_list if ds.lower() in ds_lower]
if not avail_ds:
raise ValueError(
f"No datasets found in the provided `datasets_list`. Available datasets: {', '.join(datasets)}"
)
self.failure_messages = []
failures = []
with cf.ThreadPoolExecutor() as executor:
futures: Dict[str, cf.Future] = {}
for ds in datasets:
futures[ds] = executor.submit(
self.download_latest_distribution_to_disk,
save_directory=folder,
dataset=ds,
qdf=qdf,
as_csv=as_csv,
keep_raw_data=keep_raw_data,
)
time.sleep(FUSION_API_DELAY)
for ds, future in futures.items():
try:
future.result()
except Exception as e:
e_msg = f"Failed to download dataset {ds}: {e}"
print(e_msg)
self.failure_messages.append(e_msg)
failures.append(ds)
if failures:
print(
f"Failed to download the following datasets: {', '.join(failures)}. "
"Please check the logs for more details."
)
return catalog_df
[docs] def download_latest_delta_distribution(
self,
folder: str = None,
qdf: bool = False,
as_csv: bool = False,
keep_raw_data: bool = False,
**kwargs,
) -> pd.DataFrame:
"""
Download the latest Delta distribution for all datasets in the JPMaQS product.
Parameters
----------
folder : str
The folder where the Delta distribution will be saved. If None, a folder with
the current date will be created in the current directory.
qdf : bool
If True, converts the DataFrame to a QuantamentalDataFrame.
as_csv : bool
If True, saves the downloaded datasets as CSV files. Default is False, with
Parquet as the default format.
keep_raw_data : bool
If True, keeps the raw data files after conversion. Default is False.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the metadata catalog.
"""
return self._download_multiple_distributions_to_disk(
folder=folder,
qdf=qdf,
include_catalog=False,
include_full_datasets=False,
include_explorer_datasets=False,
include_delta_datasets=True,
as_csv=as_csv,
keep_raw_data=keep_raw_data,
)
[docs] def download_latest_full_snapshot(
self,
folder: str = None,
qdf: bool = False,
include_catalog: bool = False,
include_notifications: bool = False,
include_explorer_datasets: bool = False,
include_delta_datasets: bool = False,
as_csv: bool = False,
keep_raw_data: bool = False,
datasets_list: List[str] = None,
**kwargs,
) -> pd.DataFrame:
"""
Download the latest full snapshot of all datasets in the JPMaQS product.
Parameters
----------
folder : str
The folder where the snapshot will be saved. If None, a folder with the current
date will be created in the current directory.
qdf : bool
If True, converts the DataFrame to a QuantamentalDataFrame.
include_catalog : bool
If True, includes the metadata catalog dataset in the snapshot. Default is
False.
include_notifications : bool
If True, includes notifications dataset in the snapshot. Default is False.
include_explorer_datasets : bool
If True, includes Explorer datasets in the snapshot. Default is False.
include_delta_datasets : bool
If True, includes Delta datasets in the snapshot. Default is False.
as_csv : bool
If True, saves the downloaded datasets as CSV files. Default is False, with
Parquet as the default format.
keep_raw_data : bool
If True, keeps the raw data files after conversion. Default is False.
datasets_list : Optional[List[str]]
A list of specific datasets to download. If None, all datasets specified using
the `include_*` parameters will be downloaded.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the metadata catalog.
"""
start_time = time.time()
result = self._download_multiple_distributions_to_disk(
folder=folder,
qdf=qdf,
include_catalog=include_catalog,
include_notifications=include_notifications,
include_full_datasets=True,
include_explorer_datasets=include_explorer_datasets,
include_delta_datasets=include_delta_datasets,
as_csv=as_csv,
keep_raw_data=keep_raw_data,
datasets_list=datasets_list,
)
end_time = time.time()
elapsed_time = end_time - start_time
print(
f"Downloaded latest full snapshot of JPMaQS datasets in {elapsed_time:.2f} seconds."
)
return result
[docs] def download(
self,
folder: str = None,
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
metrics: List[str] = ["all"],
start_date: str = "2000-01-01",
end_date: Optional[str] = None,
qdf: bool = True,
as_csv: bool = False,
**kwargs,
) -> Optional[pd.DataFrame]:
"""
Download data for specified tickers, `cids`, or `xcats` from the JPMaQS product.
This method downloads the latest full snapshots of the requested tickers' respective
datasets and filters them based on the provided parameters.
Parameters
----------
folder : str
The folder where the downloaded data will be saved. If None, a dataframe
will be returned without saving to disk.
tickers : Optional[List[str]]
A list of tickers to download data for. This list will be concatenated with
the tickers generated from the combination of `cids` and `xcats`.
cids : Optional[List[str]]
A list of `cids` to download data for. This will be used to generate tickers
in the format "cid_xcat".
xcats : Optional[List[str]]
A list of `xcats` to download data for. This will be used to generate tickers
in the format "cid_xcat".
metrics : List[str]
A list of metrics to include in the downloaded data. Default is ["all"], which
includes all available metrics.
start_date : str
The start date for the data to be downloaded, in "YYYY-MM-DD" format.
Default is "2000-01-01".
end_date : Optional[str]
The end date for the data to be downloaded, in "YYYY-MM-DD" format.
If None, defaults to the current date.
qdf : bool
If True, converts the DataFrame to a QuantamentalDataFrame. Default is True.
as_csv : bool
If True, saves the downloaded datasets as CSV files. Default is False, with
Parquet as the default format.
**kwargs : dict
Additional keyword arguments to pass to the API request.
Returns
-------
pd.DataFrame
A DataFrame containing the downloaded data for the specified tickers, `cids`,
or `xcats`. If `folder` is specified, the data will also be saved to disk.
"""
save_to_folder = folder is not None
if folder is None:
folder = Path.cwd()
folder: Path = Path(folder).expanduser()
def vartolist(x: Optional[List[str]]) -> List[str]:
return [x] if isinstance(x, str) else x
tickers = vartolist(tickers)
cids = vartolist(cids)
xcats = vartolist(xcats)
metrics = vartolist(metrics)
if tickers is None:
tickers = []
if bool(cids) ^ bool(xcats):
raise ValueError(
"Both `cids` and `xcats` must be provided together or neither."
)
if cids is not None and xcats is not None:
tickers += [f"{cid}_{xcat}" for cid in cids for xcat in xcats]
if not tickers:
raise ValueError(
"At least one of `tickers`, `cids`, or `xcats` must be provided."
)
tickers = sorted(set(tickers))
catalog_df = self.get_metadata_catalog()
all_tickers_lower = catalog_df["Ticker"].str.lower().tolist()
non_existing = sorted(_ for _ in tickers if _.lower() not in all_tickers_lower)
tickers = sorted(_ for _ in tickers if _.lower() not in non_existing)
if non_existing:
wstr = f"There are {len(non_existing)} tickers that do not exist in the metadata catalog. "
wstr += "Please check the input tickers against the metadata catalog."
warnings.warn(wstr)
tickers_info = catalog_df[
catalog_df["Ticker"].str.lower().isin([_.lower() for _ in tickers])
]
datasets = tickers_info.drop_duplicates(subset=["Theme"], keep="first")
if datasets.empty:
raise ValueError(
"No datasets found for the specified tickers. Please check the tickers "
"against the metadata catalog."
)
datasets = sorted(
set(
"JPMAQS_"
+ datasets["Theme"]
.str.replace(" ", "_")
.str.upper()
.reset_index(drop=True)
)
)
if save_to_folder:
return self.download_latest_full_snapshot(
folder=folder,
qdf=qdf,
include_catalog=True,
include_notifications=True,
as_csv=as_csv,
keep_raw_data=kwargs.pop("keep_raw_data", False),
datasets_list=datasets,
**kwargs,
)
if end_date is None:
end_date = pd.Timestamp.utcnow().strftime("%Y-%m-%d")
if pd.Timestamp(start_date) > pd.Timestamp(end_date):
start_date, end_date = end_date, start_date
def _download_df(
dataset: str,
tickers: List[str],
start_date: str,
end_date: str,
**kwargs,
) -> pd.DataFrame:
series_member = self.get_latest_seriesmember_identifier(
dataset=dataset, **kwargs
)
df = self.download_and_filter_series_member_distribution(
dataset=dataset,
seriesmember=series_member,
tickers=tickers,
start_date=start_date,
end_date=end_date,
qdf=qdf,
**kwargs,
)
df["dataset"] = dataset
return df
_commonargs = dict(tickers=tickers, start_date=start_date, end_date=end_date)
print(f"downloading {len(datasets)} datasets: {', '.join(datasets)}")
results: List[pd.DataFrame] = []
with cf.ThreadPoolExecutor() as executor:
futures: Dict[str, cf.Future] = {}
for dataset in datasets:
futures[dataset] = executor.submit(
_download_df, dataset=dataset, **_commonargs, **kwargs
)
time.sleep(FUSION_API_DELAY)
for dataset, future in futures.items():
try:
results.append(future.result())
except Exception as e:
print(f"Failed to download data for dataset {dataset}: {e}")
if not len(results) or all(df.empty for df in results):
results = pd.DataFrame()
else:
results = pd.concat(results, ignore_index=True).reset_index(drop=True)
if results.empty:
raise ValueError(
"No data found for the specified tickers, cids, or xcats within the date range."
)
return QuantamentalDataFrame(results)
if __name__ == "__main__":
st = time.time()
oauth_handler = FusionOAuth.from_credentials_json(
"data/fusion_client_credentials.json"
)
jpmaqs_client = JPMaQSFusionClient(oauth_handler=oauth_handler)
st = time.time()
df = jpmaqs_client.get_notifications_distribution()
print(df.head())
print(f"Time taken for notifications download: {time.time() - st:.2f} seconds")
st = time.time()
ds = jpmaqs_client.list_datasets()
print(ds.head())
jpmaqs_client.get_all_seriesmembers_for_all_datasets(
include_delta_datasets=True,
include_full_datasets=True,
include_catalog=True,
include_notifications=True,
)
df = jpmaqs_client.download(
# folder="./data",
cids=["USD", "GBP", "EUR", "JPY", "CHF", "AUD", "CAD"],
xcats=["FXXR_NSA", "EQXR_NSA", "EQCRY_NSA"],
tickers=["USD_EQXR_NSA", "GBP_EQXR_NSA"],
start_date="2025-07-17",
)
print(df.head())
print(f"Time taken for download: {time.time() - st:.2f} seconds")
df = None
ds = jpmaqs_client.list_datasets()
st = time.time()
jpmaqs_client.download_latest_full_snapshot(
folder="./data",
keep_raw_data=False,
# qdf=True,
# as_csv=True,
)
print(f"Time taken for full snapshots download: {time.time() - st:.2f} seconds")
st = time.time()
jpmaqs_client.download_latest_delta_distribution(
folder="./data",
# qdf=True,
# as_csv=True,
keep_raw_data=False,
)
print(f"Time taken for latest delta download: {time.time() - st:.2f} seconds")