"""
JPMaQS Download Interface
"""
import datetime
from dateutil.relativedelta import relativedelta
import io
import logging
import os
import glob
import shutil
import json
import warnings
from timeit import default_timer as timer
from typing import Dict, List, Optional, Tuple, Union, Any, Generator
import itertools
import joblib
from tqdm import tqdm
import pandas as pd
from macrosynergy.download.dataquery import (
JPMAQS_GROUP_ID,
DataQueryInterface,
API_DELAY_PARAM,
)
from macrosynergy.download.exceptions import InvalidDataframeError, DataOutOfSyncError
from macrosynergy.management.utils import (
is_valid_iso_date,
concat_single_metric_qdfs,
ticker_df_to_qdf,
)
from macrosynergy.management.constants import JPMAQS_METRICS
from macrosynergy.management.types import QuantamentalDataFrame
logger = logging.getLogger(__name__)
debug_stream_handler = logging.StreamHandler(io.StringIO())
debug_stream_handler.setLevel(logging.NOTSET)
debug_stream_handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(levelname)s - %(module)s - %(funcName)s :: %(message)s"
)
)
logger.addHandler(debug_stream_handler)
DEFAULT_CLIENT_ID_ENV_VAR: str = "DQ_CLIENT_ID"
DEFAULT_CLIENT_SECRET_ENV_VAR: str = "DQ_CLIENT_SECRET"
[docs]def deconstruct_expression(
expression: Union[str, List[str]],
) -> Union[List[str], List[List[str]]]:
"""
Deconstruct an expression into a list of cid, xcat, and metric. Achieves the inverse
of construct_expressions(). For non-JPMaQS expressions, the returned list will be
[expression, expression, 'value']. The metric is set to 'value' to ensure the
reported metric is consistent with the standard JPMaQS metrics
(JPMaQSDownload.valid_metrics).
Parameters
----------
expression : str
expression to deconstruct. If a list is provided, each element will be
deconstructed and returned as a list of lists.
Raises
------
TypeError
if `expression` is not a string or a list of strings.
ValueError
if `expression` is an empty list.
Returns
-------
list[str]
list of cid, xcat, and metric.
"""
if not isinstance(expression, (str, list)):
raise TypeError("`expression` must be a string or a list of strings.")
if isinstance(expression, list):
if not all(isinstance(exprx, str) for exprx in expression):
raise TypeError("All elements of `expression` must be strings.")
elif len(expression) == 0:
raise ValueError("`expression` must be a non-empty list.")
return list(map(deconstruct_expression, expression))
else:
try:
exprx: str = expression.replace("DB(JPMAQS,", "").replace(")", "")
ticker, metric = exprx.split(",")
result: List[str] = ticker.split("_", 1) + [metric]
if len(result) != 3:
raise ValueError(f"{exprx} is not a valid JPMaQS expression.")
return ticker.split("_", 1) + [metric]
except Exception as exc:
warnings.warn(
f"Failed to deconstruct expression `{expression}`: assuming it is a non-JPMaQS expression."
f" Exception: {exc}",
UserWarning,
)
# fail safely, return list where cid = xcat = expression,
# and metric = 'value'
return [expression, expression, "value"]
[docs]def check_attributes_in_sync(ts_list) -> bool:
"""
Checks if the attributes in the response are in sync with the time-series data. This
is performed since on occasion the ticker will have just been calculated for a new
date but on certain pods the data won't have updated yet but on some it will have
updated. This can lead to the attributes on a specific time-series being out of
sync.
Parameters
----------
response_dict : list
List containing the response from the API.
Returns
-------
bool
True if the attributes are in sync, False otherwise.
"""
expressions_last_value_dict = {}
for instrument in ts_list:
attributes = instrument.get("attributes")
if not attributes:
continue
time_series = attributes[0].get("time-series")
if not time_series:
continue
last_valid_item = None
for i in range(len(time_series) - 1, -1, -1):
if time_series[i][1] is not None:
last_valid_item = time_series[i]
break
if not last_valid_item:
last_valid_item = time_series[0]
expression = attributes[0].get("expression")
if "JPMAQS" not in expression:
continue
if not expression:
last_valid_item = ["No data", 0]
else:
cid, xcat, _ = deconstruct_expression(expression)
ticker = cid + "_" + xcat
last_value_date = last_valid_item[0]
if ticker not in expressions_last_value_dict:
expressions_last_value_dict[ticker] = last_value_date
else:
if last_value_date != expressions_last_value_dict[ticker]:
return False
return True
[docs]def construct_expressions(
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
metrics: Optional[List[str]] = None,
) -> List[str]:
"""
Construct expressions from the provided arguments.
Parameters
----------
tickers : list[str]
list of tickers.
cids : list[str]
list of cids.
xcats : list[str]
list of xcats.
metrics : list[str]
list of metrics.
Returns
-------
list[str]
list of expressions.
"""
if tickers is None:
tickers = []
if cids is not None and xcats is not None:
tickers += [f"{cid}_{xcat}" for cid in cids for xcat in xcats]
return [f"DB(JPMAQS,{tick},{metric})" for tick in tickers for metric in metrics]
[docs]def get_expression_from_qdf(df: Union[pd.DataFrame, List[pd.DataFrame]]) -> List[str]:
if isinstance(df, list):
return list(itertools.chain.from_iterable(map(get_expression_from_qdf, df)))
metrics = list(set(df.columns) - set(QuantamentalDataFrame.IndexCols))
exprs = []
for metric in metrics:
for cid, xcat in df[["cid", "xcat"]].drop_duplicates().values:
if any(~df.loc[(df["cid"] == cid) & (df["xcat"] == xcat), metric].isna()):
exprs.append(f"DB(JPMAQS,{cid}_{xcat},{metric})")
return exprs
[docs]def get_expression_from_wide_df(
df: Union[pd.DataFrame, List[pd.DataFrame]],
) -> List[str]:
if isinstance(df, list):
return list(itertools.chain.from_iterable(map(get_expression_from_wide_df, df)))
return list(set(df.columns))
[docs]def timeseries_to_qdf(timeseries: Dict[str, Any]) -> QuantamentalDataFrame:
"""
Converts a dictionary containing a time-series to a QuantamentalDataFrame.
Parameters
----------
timeseries : Dict[str, Any]
A dictionary containing a time-series.
Returns
-------
QuantamentalDataFrame
The converted DataFrame.
"""
if not isinstance(timeseries, dict):
raise TypeError("Argument `timeseries` must be a dictionary.")
if _get_ts(timeseries) is None:
return None
cid, xcat, metric = deconstruct_expression(_get_expr(timeseries))
df = pd.DataFrame(_get_ts(timeseries), columns=["real_date", metric])
df["real_date"] = pd.to_datetime(df["real_date"], format="%Y%m%d")
if df.empty or all(df.isna().all()):
return None
df = QuantamentalDataFrame.from_long_df(
df=df.dropna().reset_index(drop=True),
value_column=metric,
cid=cid,
xcat=xcat,
)
if metric == "last_updated":
df["last_updated"] = pd.to_datetime(df["last_updated"])
return df
[docs]def timeseries_to_column(
timeseries: Dict[str, Any], errors: str = "ignore"
) -> pd.DataFrame:
"""
Converts a dictionary of time series to a DataFrame with a single column.
Parameters
----------
timeseries : Dict[str, Any]
A dictionary of time series.
errors : str
The error handling method to use. If 'raise', then invalid items in the list
will raise an error. If 'ignore', then invalid items will be ignored. Default is
'ignore'.
Returns
-------
pd.DataFrame
The converted DataFrame.
"""
if not isinstance(timeseries, dict):
raise TypeError("Argument `timeseries` must be a dictionary.")
if errors not in ["raise", "ignore"]:
raise ValueError("`errors` must be one of 'raise' or 'ignore'.")
expression = _get_expr(timeseries)
if _get_ts(timeseries) is None:
if errors == "raise":
raise ValueError("Time series is empty.")
return None
df: pd.DataFrame = pd.DataFrame(
_get_ts(timeseries), columns=["real_date", expression]
)
df["real_date"] = pd.to_datetime(df["real_date"], format="%Y%m%d")
df = df.dropna()
if df.empty:
if errors == "raise":
raise ValueError("Time series is empty.")
return None
return df.set_index("real_date")
[docs]def concat_column_dfs(
df_list: List[pd.DataFrame], errors: str = "ignore"
) -> pd.DataFrame:
"""
Concatenates a list of DataFrames into a single DataFrame.
Parameters
----------
df_list : List[pd.DataFrame]
A list of DataFrames.
errors : str
The error handling method to use. If 'raise', then invalid items in the list
will raise an error. If 'ignore', then invalid items will be ignored. Default is
'ignore'.
Returns
-------
pd.DataFrame
The concatenated DataFrame.
"""
if not isinstance(df_list, list):
raise TypeError("Argument `df_list` must be a list.")
if errors not in ["raise", "ignore"]:
raise ValueError("`errors` must be one of 'raise' or 'ignore'.")
if not all([isinstance(df, pd.DataFrame) for df in df_list]):
if errors == "raise":
raise TypeError("All elements in `df_list` must be DataFrames.")
df_list = [df for df in df_list if isinstance(df, pd.DataFrame)]
def _pop_df_list() -> Generator[pd.DataFrame, None, None]:
while df_list:
yield df_list.pop(0)
# all the dfs are indexed by real_date, so we can just concat them adn drop dates when all values are NaN
# df: pd.DataFrame = pd.concat(df_list, axis=1).dropna(how="all", axis=0)
return pd.concat(_pop_df_list(), axis=1).dropna(how="all", axis=0)
def _get_expr(ts: dict) -> str:
return ts["attributes"][0]["expression"]
def _get_ts(ts: dict) -> dict:
return ts["attributes"][0]["time-series"]
def _get_ticker(ts: dict) -> str:
return _get_expr(ts).split(",")[1]
def _get_xcat(ticker: str) -> str:
return ticker.split("_", 1)[1]
def _ticker_filename(ticker: str, save_path: str) -> str:
return os.path.join(save_path, _get_xcat(ticker), f"{ticker}.csv")
def _save_qdf(data: List[dict], save_path: str) -> None:
for ticker in sorted(set(map(_get_ticker, data))):
ticker_filename = _ticker_filename(ticker, save_path)
os.makedirs(os.path.dirname(ticker_filename), exist_ok=True)
ts = [_ts for _ts in data if _get_ticker(_ts) == ticker]
df: QuantamentalDataFrame = QuantamentalDataFrame.from_qdf_list(
[timeseries_to_qdf(_ts) for _ts in ts]
).drop(columns=["cid", "xcat"])
if os.path.exists(_ticker_filename(ticker, save_path)):
edf = pd.read_csv(
ticker_filename, parse_dates=["real_date"], index_col="real_date"
)
edf = edf.drop(columns=[col for col in edf.columns if col in df.columns])
df = pd.concat([edf, df.set_index("real_date")], axis=1).reset_index()
os.remove(ticker_filename)
df.to_csv(ticker_filename, index=False)
return
def _save_timeseries_as_column(data: List[dict], save_path: str) -> None:
for ts in data:
if _get_ts(ts) is None:
continue
expr = _get_expr(ts)
df = timeseries_to_column(ts)
if df.empty:
continue
df.reset_index().to_csv(os.path.join(save_path, f"{expr}.csv"), index=False)
return
def _save_timeseries(data: List[dict], save_path: str):
data = [d for d in data if d is not None]
if len(data) == 0:
return
for ts in data:
with open(os.path.join(save_path, f"{_get_expr(ts)}.json"), "w") as f:
json.dump(ts, f)
return
[docs]def validate_downloaded_df(
data_df: pd.DataFrame,
expected_expressions: List[str],
found_expressions: List[str],
start_date: Optional[str] = None,
end_date: Optional[str] = None,
verbose: bool = True,
) -> bool:
"""
Validate the downloaded data in the provided dataframe.
Parameters
----------
data_df : pd.DataFrame
dataframe containing the downloaded data.
expected_expressions : list[str]
list of expressions that were expected to be downloaded.
found_expressions : list[str]
list of expressions that were actually downloaded.
start_date : str
start date of the downloaded data.
end_date : str
end date of the downloaded data.
verbose : bool
whether to print the validation results.
Raises
------
TypeError
if `data_df` is not a dataframe.
Returns
-------
bool
True if the downloaded data is valid, False otherwise.
"""
if not isinstance(data_df, pd.DataFrame):
raise InvalidDataframeError(
"Empty or invalid dataframe, please check download parameters."
)
if data_df.empty:
return False
# check if all expressions are present in the df
exprs_f, expr_expected = set(found_expressions), set(expected_expressions)
expr_missing = expr_expected - exprs_f
unexpected_exprs = exprs_f - expr_expected
if unexpected_exprs:
raise InvalidDataframeError(
f"Unexpected expressions were found in the downloaded data: "
f"{unexpected_exprs}"
)
if expr_missing:
log_str = (
f"Some expressions are missing from the downloaded data. "
"Check logger output for complete list.\n"
f"{len(expr_missing)} out of {len(expr_expected)} expressions are missing. "
f"To download the catalogue of all available expressions and filter the "
"unavailable expressions, set `get_catalogue=True` in the "
"call to `JPMaQSDownload.download()`."
)
logger.info(log_str)
logger.info(f"Missing expressions: {expr_missing}")
if verbose:
print(log_str)
# check if all dates are present in the df
# NOTE : Hardcoded max start date to 1990-01-01. This is because the JPMAQS
# database does not have data before this date.
if datetime.datetime.strptime(start_date, "%Y-%m-%d") < datetime.datetime.strptime(
"1990-01-01", "%Y-%m-%d"
):
start_date = "1950-01-01"
dates_expected = pd.bdate_range(start=start_date, end=end_date)
found_dates = (
data_df["real_date"].unique()
if isinstance(data_df, QuantamentalDataFrame)
else data_df.index.unique()
)
dates_missing = list(set(dates_expected) - set(pd.to_datetime(found_dates)))
log_str = (
"The expressions in the downloaded data are not a subset of the expected expressions."
" Missing expressions: {missing_exprs}"
)
check_exprs = set()
if isinstance(data_df, QuantamentalDataFrame):
found_metrics = list(
set(data_df.columns) - set(QuantamentalDataFrame.IndexCols)
)
for col in QuantamentalDataFrame.IndexCols:
if not len(data_df[col].unique()) > 0:
raise InvalidDataframeError(f"Column {col} is empty.")
tkrs = (
(data_df["cid"].astype(str) + "_" + data_df["xcat"].astype(str))
.unique()
.tolist()
)
check_exprs = construct_expressions(tickers=tkrs, metrics=found_metrics)
else:
check_exprs = data_df.columns.tolist()
missing_exprs = set(check_exprs) - set(found_expressions)
if len(missing_exprs) > 0:
logger.critical(log_str.format(missing_exprs=missing_exprs))
if len(dates_missing) > 0:
log_str = (
f"Some dates are missing from the downloaded data. \n"
f"{len(dates_missing)} out of {len(dates_expected)} dates are missing."
)
logger.warning(log_str)
if verbose:
print(log_str)
return True
def _get_expressions_from_qdf_csv(file_path: str) -> List[str]:
ticker = os.path.basename(file_path).split(".")[0]
with open(file_path, "r", encoding="utf-8") as f:
headers = f.readline().strip().split(",")
assert len(set(headers)) == len(headers), f"Duplicate headers in {file_path}"
metrics = set(headers) - set(["real_date"])
return [f"DB(JPMAQS,{ticker},{metric})" for metric in metrics]
def _get_expressions_from_wide_csv(file_path: str) -> List[str]:
with open(file_path, "r", encoding="utf-8") as f:
headers = f.readline().strip().split(",")
assert len(set(headers)) == len(headers), f"Duplicate headers in {file_path}"
expression = list(set(headers) - set(["real_date"]))
return expression
def _get_expressions_from_json(file_path: str) -> List[str]:
with open(file_path, "r", encoding="utf-8") as f:
return [_get_expr(json.load(f))]
[docs]def get_expressions_from_file(
file_path: str, as_dataframe: bool = True, dataframe_format: str = "qdf"
) -> List[str]:
"""
Loads the expressions found in a downloaded timeseries file (either JSON or CSV).
Parameters
----------
file_path : str
path to the file.
as_dataframe : bool
whether to load the file as a dataframe.
dataframe_format : str
the format of the dataframe. Must be one of 'qdf' or 'wide'.
Returns
-------
List[str]
list of expressions found in the file.
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File {file_path} does not exist.")
if not as_dataframe:
return _get_expressions_from_json(file_path)
if dataframe_format not in ["qdf", "wide"]:
raise ValueError("`dataframe_format` must be one of 'qdf' or 'wide'.")
if dataframe_format == "qdf":
return _get_expressions_from_qdf_csv(file_path)
elif dataframe_format == "wide":
return _get_expressions_from_wide_csv(file_path)
[docs]def validate_downloaded_data(
path: str,
expected_expressions: List[str],
as_dataframe: bool = True,
dataframe_format: str = "qdf",
show_progress: bool = True,
) -> List[str]:
"""
Validate the downloaded data in the provided path.
Parameters
----------
path : str
path to the downloaded data.
expected_expressions : list[str]
list of expressions that were expected to be downloaded.
as_dataframe : bool
whether to load the files as dataframes.
dataframe_format : str
the format of the dataframe. Must be one of 'qdf' or 'wide'.
show_progress : bool
whether to show a progress bar.
Returns
-------
list[str]
list of expressions that are missing from the downloaded data.
"""
if not os.path.isdir(path):
raise ValueError(f"Path {path} does not exist.")
ext = "csv" if as_dataframe else "json"
files = glob.glob(f"{path}/**/*.{ext}", recursive=True)
def get_expression_func(
file_path: str, as_dataframe=as_dataframe, dataframe_format=dataframe_format
) -> List[str]:
return get_expressions_from_file(
file_path, as_dataframe=as_dataframe, dataframe_format=dataframe_format
)
all_exprs = []
all_exprs = joblib.Parallel(n_jobs=-1)(
joblib.delayed(get_expression_func)(file_path)
for file_path in tqdm(
files,
desc="Validating downloaded data",
disable=not show_progress,
)
)
# join all the lists of expressions
all_exprs = list(itertools.chain.from_iterable(all_exprs))
missing_exprs = sorted(set(expected_expressions) - set(all_exprs))
if len(missing_exprs) > 0:
logger.critical(
f"Some expressions are missing from the downloaded data. "
f"Missing expressions: {missing_exprs}"
)
return missing_exprs
[docs]class JPMaQSDownload(DataQueryInterface):
"""
JPMaQSDownload Object. This object is used to download JPMaQS data via the DataQuery
API. It can be extended to include the use of proxies, and even request generic
DataQuery expressions.
Parameters
----------
oauth : bool
True if using oauth, False if using username/password with crt/key.
client_id : Optional[str]
oauth client_id, required if oauth=True.
client_secret : Optional[str]
oauth client_secret, required if oauth=True.
crt : Optional[str]
path to crt file.
key : Optional[str]
path to key file.
username : Optional[str]
username for certificate based authentication.
password : Optional[str]
paired with username for certificate
debug : bool
True if debug mode, False if not.
suppress_warning : bool
True if suppressing warnings, False if not.
check_connection : bool
True if the interface should check the connection to the server before sending
requests, False if not. False by default.
proxy : Optional[dict]
proxy to use for requests, None if not using proxy (default).
print_debug_data : bool
True if debug data should be printed, False if not (default).
dq_kwargs : dict
additional arguments to pass to the DataQuery API object such `calender` and
`frequency` for the DataQuery API. For more fine-grained usage, initialize the
DataQueryInterface object explicitly.
kwargs : dict
any other keyword arguments.
Raises
------
TypeError
if provided arguments are not of the correct type.
ValueError
if provided arguments are invalid or semantically incorrect.
"""
def __init__(
self,
oauth: bool = True,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
crt: Optional[str] = None,
key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
check_connection: bool = True,
proxy: Optional[Dict] = None,
suppress_warning: bool = True,
debug: bool = False,
print_debug_data: bool = False,
dq_download_kwargs: dict = {},
*args,
**kwargs,
):
vars_types_zip: List[Tuple[str, str]] = [
(oauth, "oauth", bool),
(check_connection, "check_connection", bool),
(suppress_warning, "suppress_warning", bool),
(debug, "debug", bool),
(print_debug_data, "print_debug_data", bool),
]
for varx, namex, typex in vars_types_zip:
if not isinstance(varx, typex):
raise TypeError(f"`{namex}` must be of type {typex}.")
if not isinstance(proxy, dict) and proxy is not None:
raise TypeError("`proxy` must be a dictionary or None.")
if not isinstance(dq_download_kwargs, dict):
raise TypeError("`dq_download_kwargs` must be a dictionary.")
self.suppress_warning = suppress_warning
self.debug = debug
self.print_debug_data = print_debug_data
self._check_connection = check_connection
self.dq_download_kwargs = dq_download_kwargs
for varx, namex in [
(client_id, "client_id"),
(client_secret, "client_secret"),
(crt, "crt"),
(key, "key"),
(username, "username"),
(password, "password"),
]:
if varx is not None:
if not isinstance(varx, str):
raise TypeError(f"`{namex}` must be a string.")
if not all([crt, key, username, password]):
if not all([client_id, client_secret]):
# check the environment variables
_clid = os.getenv(DEFAULT_CLIENT_ID_ENV_VAR)
_clsc = os.getenv(DEFAULT_CLIENT_SECRET_ENV_VAR)
if all([_clid, _clsc]):
client_id = _clid
client_secret = _clsc
if not (all([client_id, client_secret]) or all([crt, key, username, password])):
raise ValueError(
"Must provide either `client_id` and `client_secret` for oauth, or "
"`crt`, `key`, `username`, and `password` for certificate based authentication."
)
self.valid_metrics: List[str] = JPMAQS_METRICS
self.msg_errors: List[str] = []
self.msg_warnings: List[str] = []
self.unavailable_expressions: List[str] = []
self.downloaded_data: Dict = {}
self.jpmaqs_access: bool = True
super().__init__(
oauth=oauth,
client_id=client_id,
client_secret=client_secret,
crt=crt,
key=key,
username=username,
password=password,
proxy=proxy,
check_connection=check_connection,
suppress_warning=suppress_warning,
debug=debug,
*args,
**kwargs,
)
if self._check_connection:
self.check_connection()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback): ...
def _get_unavailable_expressions(
self,
expected_exprs: List[str],
dicts_list: Optional[List[dict]] = None,
downloaded_df: Optional[Union[pd.DataFrame, QuantamentalDataFrame]] = None,
) -> List[str]:
assert (dicts_list is not None) ^ (downloaded_df is not None)
if dicts_list is not None:
return super()._get_unavailable_expressions(
expected_exprs=expected_exprs, dicts_list=dicts_list
)
if downloaded_df is not None:
if len(downloaded_df) == 0:
return expected_exprs
if isinstance(downloaded_df, QuantamentalDataFrame):
found_expressions = get_expression_from_qdf(downloaded_df)
else:
found_expressions = get_expression_from_wide_df(downloaded_df)
return list(set(expected_exprs) - set(found_expressions))
[docs] def validate_download_args(
self,
tickers: List[str],
cids: List[str],
xcats: List[str],
metrics: List[str],
start_date: str,
end_date: str,
get_catalogue: bool,
expressions: List[str],
show_progress: bool,
as_dataframe: bool,
dataframe_format: str,
report_time_taken: bool,
) -> bool:
"""
Validate the arguments passed to the download function.
Raises
------
TypeError
If any of the arguments are not of the correct type.
ValueError
If any of the arguments are semantically incorrect.
Returns
-------
bool
True if valid.
"""
for var, name in [
(get_catalogue, "get_catalogue"),
(show_progress, "show_progress"),
(as_dataframe, "as_dataframe"),
(report_time_taken, "report_time_taken"),
]:
if not isinstance(var, bool):
raise TypeError(f"`{name}` must be a boolean.")
if not isinstance(dataframe_format, str):
raise TypeError("`dataframe_format` must be a string.")
elif dataframe_format.lower() not in ["qdf", "wide"]:
raise ValueError("`dataframe_format` must be one of 'qdf' or 'wide'.")
if all([tickers is None, cids is None, xcats is None, expressions is None]):
raise ValueError(
"Must provide at least one of `tickers`, "
"`expressions`, or `cids` & `xcats` together."
)
for var, name in [
(tickers, "tickers"),
(cids, "cids"),
(xcats, "xcats"),
(expressions, "expressions"),
]:
if not isinstance(var, list) and var is not None:
raise TypeError(f"`{name}` must be a list of strings.")
if var is not None:
if len(var) > 0:
if not all([isinstance(ticker, str) for ticker in var]):
raise TypeError(f"`{name}` must be a list of strings.")
else:
raise ValueError(f"`{name}` must be a non-empty list of strings.")
if metrics is None:
raise ValueError("`metrics` must be a non-empty list of strings.")
else:
if all([metric not in self.valid_metrics for metric in metrics]):
raise ValueError(f"`metrics` must be a subset of {self.valid_metrics}.")
if bool(cids) ^ bool(xcats):
raise ValueError(
"If specifying `xcats`, `cids` must also be specified (and vice versa)."
)
for var, name in [
(start_date, "start_date"),
(end_date, "end_date"),
]:
if not is_valid_iso_date(var): # type check covered by `is_valid_iso_date`
raise ValueError(
f"`{name}` must be a valid date in the format YYYY-MM-DD."
)
if pd.to_datetime(var, errors="coerce") is pd.NaT:
raise ValueError(
f"`{name}` must be a valid date > "
f"{pd.Timestamp.min.strftime('%Y-%m-%d')}.\n"
"Check pandas documentation:"
" https://pandas.pydata.org/docs/user_guide/timeseries.html#timestamp-limitations`"
)
if pd.to_datetime(var) < pd.to_datetime("1950-01-01"):
warnings.warn(
message=(
f"`{name}` is set before 1950-01-01."
"Data before 1950-01-01 may not be available,"
" and will cause errors/missing data."
),
category=UserWarning,
)
return True
[docs] def filter_expressions_from_catalogue(
self, expressions: List[str], verbose: bool = True
) -> List[str]:
"""
Method to filter a list of expressions against the JPMaQS catalogue. This avoids
requesting data for expressions that are not in the catalogue, and provides the
user wuth the complete list of expressions that are in the catalogue.
Parameters
----------
tickers : List[str]
list of tickers to filter.
Returns
-------
List[str]
list of tickers that are in the JPMaQS catalogue.
"""
catalogue_tickers: List[str] = self.get_catalogue(verbose=verbose)
catalogue_expressions: List[str] = construct_expressions(
tickers=catalogue_tickers, metrics=self.valid_metrics
)
upper_exprs = [ex.upper() for ex in catalogue_expressions]
r: List[str] = sorted(
set(ex for ex in expressions if ex.upper() in upper_exprs)
)
if verbose:
filtered: int = len(expressions) - len(r)
if filtered > 0:
print(
f"Removed {filtered}/{len(expressions)} expressions "
"that are not in the JPMaQS catalogue."
)
return r
def _chain_download_outputs(
self, download_outputs: Union[List[Dict], List[pd.DataFrame]]
) -> Union[List[Dict], List[pd.DataFrame]]:
if not isinstance(download_outputs, list):
raise TypeError("`download_outputs` must be a list.")
download_outputs = [x for x in download_outputs if len(x) > 0]
if len(download_outputs) == 0:
return []
if isinstance(download_outputs[0], pd.DataFrame):
return concat_column_dfs(df_list=download_outputs)
if isinstance(download_outputs[0][0], (dict, bool, QuantamentalDataFrame)):
logger.debug(f"Chaining {len(download_outputs)} outputs.")
_ch_types = list(
itertools.chain.from_iterable(
[list(map(type, x)) for x in download_outputs]
)
)
logger.debug(f"Object types in the downloaded data: {_ch_types}")
download_outputs = list(itertools.chain.from_iterable(download_outputs))
if isinstance(download_outputs[0], (dict, bool)):
return download_outputs
if isinstance(download_outputs[0], QuantamentalDataFrame):
# return concat_single_metric_qdfs(download_outputs)
return QuantamentalDataFrame.from_qdf_list(download_outputs)
# cannot chain QDFs with different metrics
if not self.jpmaqs_access:
raise ValueError(
f"The credentials you have provided are for Dataquery access only and "
f"hence have no JPMaQS entitlements. Because of this you are only able "
f"to download data from 2000-01-01 to "
f"{(datetime.datetime.now() - relativedelta(months=6)).strftime('%Y-%m-%d')}."
)
raise NotImplementedError(
f"Cannot chain download outputs that are List of : {list(set(map(type, download_outputs)))}."
)
def _save_data(
self,
data: List[dict],
as_dataframe: bool,
dataframe_format: str,
save_path: str,
) -> bool:
if not os.path.exists(save_path):
os.makedirs(save_path)
if as_dataframe:
if dataframe_format == "qdf":
_save_qdf(data, save_path)
elif dataframe_format == "wide":
_save_timeseries_as_column(data, save_path)
else:
_save_timeseries(data, save_path)
return True
def _fetch_timeseries(
self,
url: str,
params: dict,
tracking_id: str,
as_dataframe: bool = True,
dataframe_format: str = "qdf",
save_path: Optional[str] = None,
*args,
**kwargs,
) -> Union[pd.DataFrame, List[Dict]]:
ts_list: List[dict] = self._fetch(
url=url, params=params, tracking_id=tracking_id
)
if not check_attributes_in_sync(ts_list):
expressions = [ts["attributes"][0]["expression"] for ts in ts_list]
error_str = f"Attributes for {expressions} are not in sync."
raise DataOutOfSyncError(error_str)
for its, ts in enumerate(ts_list):
if _get_ts(ts) is None:
self.unavailable_expressions.append(_get_expr(ts))
if "message" in ts["attributes"][0]:
self.msg_warnings.append(ts["attributes"][0]["message"])
else:
self.msg_warnings.append(
f"Time series for expression {ts['attributes'][0]['expression']} is empty. "
" No explanation was provided."
)
ts_list[its] = None
if "message" in ts["attributes"][0]:
if "[FT] Limited Dataset Access Only" in ts["attributes"][0]["message"]:
self.jpmaqs_access = False
ts_list: List[dict] = list(filter(None, ts_list))
logger.debug(f"Downloaded data for {len(ts_list)} expressions.")
logger.debug(f"Unavailable expressions: {self.unavailable_expressions}")
if save_path is not None:
try:
ts_list = [
self._save_data(
data=ts_list,
as_dataframe=as_dataframe,
dataframe_format=dataframe_format,
save_path=save_path,
*args,
**kwargs,
)
]
except Exception as exc:
logger.error(f"Failed to save data to disk: {exc}")
self.msg_errors.append(f"Failed to save data to disk: {exc}")
raise exc
elif as_dataframe:
if dataframe_format == "qdf":
ts_list = [timeseries_to_qdf(ts) for ts in ts_list if ts is not None]
elif dataframe_format == "wide":
ts_list = concat_column_dfs(
df_list=[timeseries_to_column(ts) for ts in ts_list]
)
downloaded_types = list(set(map(type, ts_list)))
logger.debug(
f"Object types in the downloaded data: {downloaded_types}"
+ ("(saving to disk)" if save_path is not None else "")
)
return ts_list
[docs] def get_catalogue(
self,
group_id: str = JPMAQS_GROUP_ID,
page_size: int = 1000,
verbose: bool = True,
) -> List[str]:
"""
Get the JPMaQS catalogue.
Returns
-------
List[str]
list of tickers in the JPMaQS catalogue.
"""
return super().get_catalogue(group_id, page_size, verbose)
[docs] def download_all_to_disk(
self,
path: str,
expressions: Optional[List[str]] = None,
as_dataframe: bool = True,
dataframe_format: str = "qdf",
show_progress: bool = True,
delay_param: float = API_DELAY_PARAM,
batch_size: Optional[int] = None,
retry: int = 3,
overwrite: bool = True,
*args,
**kwargs,
) -> None:
"""
Downloads all JPMaQS data to disk.
Parameters
----------
path : str
path to the directory where the data will be saved.
expressions : Optional[List[str]
Default is None, meaning all expressions in the JPMaQS catalogue will be
downloaded. If provided, only the expressions in the list will be downloaded.
as_dataframe : bool
Default is True, meaning the data will be saved as a DataFrame (either in
the Quantamental Data Format ('qdf') or wide format ('wide')). If False, the
data will be saved as JSON files, with one expression per file.
dataframe_format : str
Default is 'qdf'. If `as_dataframe` is True, this parameter specifies the
format of the DataFrame. Must be one of 'qdf' or 'wide'.
show_progress : bool
Default is True, meaning the progress of the download will be displayed. If
False, the progress will not be displayed.
delay_param : float
Default is 0.2 seconds (fastest allowed by DataQuery API). The delay
parameter to use when making requests to the DataQuery API. Ideally, this should
not be changed.
batch_size : int
Default is None, meaning the batch size will be set to the default size
(20). If provided, this parameter specifies the number of expressions to
download in each batch.
retry : int
Default is 3, meaning the download will be retried 3 times for any
expressions that fail to download. If set to 0, no retries will be attempted.
overwrite : bool
Default is True, meaning the data will be overwritten if it already exists.
If False, the data will not be overwritten.
kwargs : dict
any other keyword arguments.
Returns
-------
None
The data is saved to disk.
Examples
--------
Download all JPMaQS data to disk.
>>> with JPMaQSDownload(
... client_id=os.getenv("DQ_CLIENT_ID"),
... client_secret=os.getenv("DQ_CLIENT_SECRET"),
... ) as jpmaqs:
... jpmaqs.download_all_to_disk(path="./jpmaqs-data")
Alternatively downloading only a custom list of expressions
>>> expressions = ['DB(JPMAQS,USD_EQXR_NSA,value)', 'DB(JPMAQS,GBP_EQXR_NSA,value)']
>>> with JPMaQSDownload(
... client_id=os.getenv("DQ_CLIENT_ID"),
... client_secret=os.getenv("DQ_CLIENT_SECRET"),
... ) as jpmaqs:
... jpmaqs.download_all_to_disk(path="./jpmaqs-data", expressions=expressions)
Save each expression as a JSON
>>> with JPMaQSDownload(
... client_id=os.getenv("DQ_CLIENT_ID"),
... client_secret=os.getenv("DQ_CLIENT_SECRET"),
... ) as jpmaqs:
... jpmaqs.download_all_to_disk(path="./jpmaqs-data", as_dataframe=False)
"""
save_path: Optional[str] = None
if path == "":
print(
"Path explicitly provided as an empty string. "
"Assuming alternate saving method implemented."
)
save_path = ""
else:
path = os.path.expandvars(os.path.expanduser(path))
save_path = os.path.join(path, "JPMaQSData")
os.makedirs(save_path, exist_ok=True)
if overwrite:
msg = f"Overwriting data in {save_path}."
warnings.warn(msg) # the user should be warned
logger.info(msg) # but log doesn't need to be warning, info is fine
shutil.rmtree(save_path)
os.makedirs(save_path, exist_ok=True)
print(f"Downloading all JPMaQS data to disk. Saving to: `{save_path}`.")
start_date = "1990-01-01"
end_date = (datetime.datetime.today() + pd.offsets.BusinessDay(2)).strftime(
"%Y-%m-%d"
)
self.batch_size = batch_size or self.batch_size
self.check_connection(verbose=True)
if not expressions:
catalogue: List[str] = self.get_catalogue()
expressions = sorted(
construct_expressions(tickers=catalogue, metrics=self.valid_metrics)
)
# if all the expressions do not contain DB(JPMAQS, then we need set dataframe format to wide
if (
as_dataframe
and dataframe_format == "qdf"
and not all([expr.startswith("DB(JPMAQS") for expr in expressions])
):
dataframe_format = "wide"
warnings.warn(
"The list of expressions contains non-JPMAQS expressions. "
"Setting dataframe format to 'wide'."
)
print(
"Downloading data from JPMaQS.\nTimestamp UTC: ",
datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
download_time_taken: float = timer()
data: List[Union[dict, QuantamentalDataFrame]] = self.download_data(
save_path=save_path,
expressions=expressions,
start_date=start_date,
end_date=end_date,
show_progress=show_progress,
as_dataframe=as_dataframe,
dataframe_format=dataframe_format,
delay_param=delay_param,
*args,
**kwargs,
)
download_time_taken: float = timer() - download_time_taken
assert all([d is True for d in data])
if path == "":
return
d_exprs = [
os.path.basename(csv).split(".")[0]
for csv in glob.glob(f"{save_path}/**/*.csv", recursive=True)
]
if len(d_exprs) == 0:
raise ValueError("No data was downloaded.")
if as_dataframe and dataframe_format == "qdf":
d_exprs = construct_expressions(tickers=d_exprs, metrics=self.valid_metrics)
logger.info(f"Downloaded {len(d_exprs)} expressions.")
unavailable_expressions = validate_downloaded_data(
path=save_path,
expected_expressions=expressions,
as_dataframe=as_dataframe,
dataframe_format=dataframe_format,
show_progress=show_progress,
)
if len(unavailable_expressions) > 0:
if retry > 0:
logger.info(
f"Retrying {len(unavailable_expressions)} unavailable expressions."
)
self.download_all_to_disk(
path=path,
expressions=unavailable_expressions,
as_dataframe=as_dataframe,
dataframe_format=dataframe_format,
show_progress=show_progress,
delay_param=delay_param,
batch_size=batch_size,
retry=retry - 1,
overwrite=False,
*args,
**kwargs,
)
else:
print(f"Failed to download {len(unavailable_expressions)} expressions.")
for expr in unavailable_expressions:
print(f"\t{expr}")
[docs] def download(
self,
tickers: Optional[List[str]] = None,
cids: Optional[List[str]] = None,
xcats: Optional[List[str]] = None,
metrics: List[str] = ["value"],
start_date: str = "2000-01-01",
end_date: Optional[str] = None,
expressions: Optional[List[str]] = None,
get_catalogue: bool = False,
show_progress: bool = False,
debug: bool = False,
suppress_warning: bool = False,
as_dataframe: bool = True,
dataframe_format: str = "qdf",
report_time_taken: bool = False,
categorical_dataframe: bool = False,
*args,
**kwargs,
) -> Union[pd.DataFrame, List[Dict]]:
"""
Driver function to download data from JPMaQS via the DataQuery API. Timeseries
data can be requested using `tickers` with `metrics`, or passing formed
DataQuery expressions. `cids` and `xcats` (along with `metrics`) are used to
construct expressions, which are ultimately passed to the DataQuery Interface.
Parameters
----------
tickers : list[str]
list of tickers.
cids : list[str]
list of cids.
xcats : list[str]
list of xcats.
metrics : list[str]
list of metrics. Available metrics are "value" (default), "grading",
"eop_lag", "mop_lag", and "last_updated". If "all" is provided,
all available metrics are used. The available metrics are defined in
`macrosynergy.download.jpmaqs.JPMAQS_METRICS`.
start_date : str
start date of the data to download, in the ISO format - YYYY-MM-DD.
end_date : str
end date of the data to download in the ISO format - YYYY-MM-DD.
expressions : list[str]
list of DataQuery expressions.
get_catalogue : bool
If True, the JPMaQS catalogue is downloaded and used to filter the list of
tickers. Default is False.
show_progress : bool
True if progress bar should be shown, False if not (default).
suppress_warning : bool
True if suppressing warnings. Default is True.
debug : bool
Override the debug behaviour of the JPMaQSDownload class. If True, debug
mode is enabled.
print_debug_data : bool
True if debug data should be printed, False if not (default). If debug=True,
this is set to True.
as_dataframe : bool
Return a dataframe if True (default), a list of dictionaries if False.
dataframe_format : str
Format of the dataframe to return, one of "qdf" or "wide". QDF is the
Quantamental Dataframe format, and wide is the wide format with each expression
as a column, and a single date column.
report_time_taken : bool
If True, the time taken to download and apply data transformations is
reported.
categorical_dataframe : bool
If True, the dataframe returned will use the pandas Categorical data type
for the `cid` and `xcat` columns. Default is False.
kwargs : dict
any other keyword arguments.
Raises
------
ValueError
if provided arguments are invalid or semantically incorrect (see
macrosynergy.download.jpmaqs.JPMaQSDownload.validate_download_args()).
Returns
-------
pd.DataFrame|list[Dict]
dataframe of data if `as_dataframe` is True, list of dictionaries if False.
"""
# override the default warning behaviour and debug behaviour
self.suppress_warning = suppress_warning
self.debug = debug
def vartolist(x):
return [x] if isinstance(x, str) else x
tickers = vartolist(tickers)
cids = vartolist(cids)
xcats = vartolist(xcats)
expressions = vartolist(expressions)
metrics = vartolist(metrics)
if len(metrics) == 1:
if metrics[0] == "all":
metrics = self.valid_metrics
if end_date is None:
_today = datetime.datetime.today()
end_date = pd.bdate_range(end=_today, periods=2)[0].strftime("%Y-%m-%d")
# This is date is cast to YYYYMMDD in macrosynergy.download.dataquery.py.
# Validate arguments.
if not self.validate_download_args(
tickers=tickers,
cids=cids,
xcats=xcats,
metrics=metrics,
start_date=start_date,
end_date=end_date,
expressions=expressions,
get_catalogue=get_catalogue,
show_progress=show_progress,
as_dataframe=as_dataframe,
dataframe_format=dataframe_format,
report_time_taken=report_time_taken,
):
raise ValueError("Invalid arguments passed to download().")
if pd.to_datetime(start_date) > pd.to_datetime(end_date):
warnings.warn(
message=(
f"`start_date` ({start_date}) is after `end_date` ({end_date}). "
"These dates will be swapped."
),
category=UserWarning,
)
start_date, end_date = end_date, start_date
dataframe_format = dataframe_format.lower()
# Construct expressions.
if expressions is None:
expressions: List[str] = []
expressions += construct_expressions(
tickers=tickers,
cids=cids,
xcats=xcats,
metrics=metrics,
)
expressions = list(set(expressions))
if get_catalogue:
expressions = self.filter_expressions_from_catalogue(expressions)
# Download data.
print(
"Downloading data from JPMaQS.\nTimestamp UTC: ",
datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
)
data: List[Dict] = []
download_time_taken: float = timer()
data: List[Union[dict, QuantamentalDataFrame]] = self.download_data(
expressions=expressions,
start_date=start_date,
end_date=end_date,
show_progress=show_progress,
as_dataframe=as_dataframe,
dataframe_format=dataframe_format,
*args,
**kwargs,
)
if not self.jpmaqs_access:
print(
"Credentials provided only have access to Dataquery and have not been "
"granted access to JPMaQS. You can only access data after 2000-01-01 and "
"before 6 months from the current date."
)
download_time_taken: float = timer() - download_time_taken
if report_time_taken:
print(f"Time taken to download data: \t{download_time_taken:.2f} seconds.")
if len(self.msg_errors) > 0:
if not self.suppress_warning:
warnings.warn(
f"{len(self.msg_errors)} errors encountered during the download. \n"
f"The errors did not compromise the download. \n"
f"Please check `JPMaQSDownload.msg_errors` for more information."
)
if as_dataframe:
found_expressions: List[str] = list(
set(expressions) - set(self.unavailable_expressions)
)
if not validate_downloaded_df(
data_df=data,
expected_expressions=expressions,
found_expressions=found_expressions,
start_date=start_date,
end_date=end_date,
verbose=True,
):
raise InvalidDataframeError("Downloaded data is invalid.")
if dataframe_format == "qdf":
assert isinstance(data, QuantamentalDataFrame)
return pd.DataFrame(
QuantamentalDataFrame(data, categorical=categorical_dataframe)
)
return data
[docs]def custom_download(
tickers, download_func, metrics=["value"], start_date=None, end_date=None
):
"""
Custom download function to download data for a list of tickers using a custom
download function.
Parameters
----------
tickers : list[str]
list of tickers to download data for.
download_func : callable
custom download function.
metrics : list[str]
list of metrics to download.
start_date : str
start date of the data to download.
end_date : str
end date of the data to download.
Returns
-------
pd.DataFrame
dataframe of downloaded data.
"""
dfs = []
for metric in metrics:
expressions = []
for ticker in list(set(tickers)):
dq_expr = f"DB(JPMAQS,{ticker},{metric})"
expressions.append(dq_expr)
df = pd.DataFrame()
step_size = 100
df_store = []
for idx in range(0, len(expressions) + 1, step_size):
df_chunk = download_func(
expressions[idx : idx + step_size],
startDate=start_date,
endDate=end_date,
)
df_chunk = df_chunk.dropna(axis=1, how="all")
df_store.append(df_chunk)
df = pd.concat(df_store, axis=1)
df.columns = df.columns.str.split(",").str[1]
df.index.name = "real_date"
df = ticker_df_to_qdf(df, metric=metric)
dfs.append(df)
df = concat_single_metric_qdfs(dfs)
return df
if __name__ == "__main__":
cids = ["AUD", "BRL", "CAD", "CHF", "CNY", "CZK", "EUR", "GBP", "USD"]
xcats = ["RIR_NSA", "FXXR_NSA", "FXXR_VT10", "DU05YXR_NSA", "DU05YXR_VT10"]
start_date: str = "2024-01-07"
end_date: str = "2024-02-09"
with JPMaQSDownload(
client_id=os.getenv("DQ_CLIENT_ID"),
client_secret=os.getenv("DQ_CLIENT_SECRET")
) as jpmaqs:
data = jpmaqs.download(
xcats=xcats,
cids=cids,
metrics="all",
start_date=start_date,
end_date=end_date,
show_progress=True,
report_time_taken=True,
)
print(data.info())
print(data)