Source code for macrosynergy.download.dataquery_file_api.segmented_file_downloader

from typing import Dict, Optional

from pathlib import Path
import concurrent.futures as cf
import logging
import traceback as tb
import time
import uuid
import shutil

import requests

from .constants import (
    DQ_FILE_API_DELAY_MARGIN,
    DQ_FILE_API_DELAY_PARAM,
    DQ_FILE_API_HEADERS_TIMEOUT,
    DQ_FILE_API_SEGMENT_SIZE_MB,
    DQ_FILE_API_STREAM_CHUNK_SIZE,
    DQ_FILE_API_TIMEOUT,
)

from .common import RateLimitedRequester

logger = logging.getLogger(__name__)


[docs]class SegmentedFileDownloader: """ A utility class to manage the multi-part, concurrent download of a single large file. """ def __init__( self, filename: str, url: str, headers: Dict[str, str], params: Dict[str, str], proxies: Optional[Dict[str, str]] = None, chunk_size: int = DQ_FILE_API_STREAM_CHUNK_SIZE, segment_size_mb: float = DQ_FILE_API_SEGMENT_SIZE_MB, timeout: float = DQ_FILE_API_TIMEOUT, api_delay: float = DQ_FILE_API_DELAY_PARAM, api_delay_margin: float = DQ_FILE_API_DELAY_MARGIN, headers_timeout: float = DQ_FILE_API_HEADERS_TIMEOUT, max_concurrent_downloads: int = None, max_file_retries: int = 3, verify_ssl: bool = True, start_download: bool = False, *, parent_requester: RateLimitedRequester, debug: bool = False, ): """Initializes the downloader with URL, headers, and download parameters.""" if parent_requester is None: raise ValueError("`parent_requester` must be provided for rate limiting.") self.parent_requester: RateLimitedRequester = parent_requester self.api_delay_seconds = api_delay * api_delay_margin self.filename = Path(filename) self.url = url self.headers = headers self.params = params if not set(["file-group-id", "file-datetime"]).issubset(params): raise ValueError( "Missing required parameters: 'file-group-id' and 'file-datetime'" ) self.file_id = params["file-group-id"] + "_" + params["file-datetime"] self.proxies = proxies self.out_dir = Path(self.filename.parent) self.out_dir.mkdir(parents=True, exist_ok=True) self.chunk_size = chunk_size self.segment_size_mb = segment_size_mb self.timeout = timeout self.headers_timeout = headers_timeout self.max_concurrent_downloads = max_concurrent_downloads self.max_file_retries = max_file_retries self.verify_ssl = verify_ssl self.debug = debug self.temp_dir = self.out_dir / f"_tmp_{self.filename.name}_{uuid.uuid4().hex}" if start_download: try: self.download() except Exception: self.cleanup() raise def __enter__(self): """Allows the downloader to be used as a context manager.""" return self def __exit__(self, exc_type, exc_value, traceback): """Ensures cleanup of temporary files upon exiting the context.""" if exc_type is not None: logger.error(tb.format_exc()) self.cleanup() return False def _wait_for_api_call(self) -> bool: return self.parent_requester._wait_for_api_call()
[docs] def log(self, msg: str, part_num: int = None, level: int = logging.INFO): """Logs a message with downloader-specific context.""" part_info = f"[part={part_num}]" if part_num is not None else "" logger.log( level, f"[SegmentedFileDownloader][file={self.file_id}]{part_info} {msg}" )
[docs] def download(self, retries: int = None) -> Path: """Orchestrates the entire file download process, including retries.""" last_exception = None if retries is None: retries = self.max_file_retries try: self.log("Starting segmented file download") start_time = time.time() if self.temp_dir.exists(): shutil.rmtree(self.temp_dir) self.temp_dir.mkdir(exist_ok=True, parents=True) total_size = self._get_file_size() self.log(f"File size: {total_size / (1024 * 1024):.2f} MB") chunk_size = int(self.segment_size_mb * 1024 * 1024) chunks = range(0, total_size, chunk_size) self.log(f"Creating {len(chunks)} download tasks") self._download_chunks_concurrently(chunks, total_size) final_path = Path(self.filename).resolve() self._assemble_parts(final_path, len(chunks)) duration = time.time() - start_time self.log(f"Download complete in {duration:.2f} seconds.") self.log(f"Saved to: {final_path}") return final_path except KeyboardInterrupt: raise except Exception as e: last_exception = e self.log(f"Download failed. Error: {e}", level=logging.ERROR) if self.debug: raise e if retries > 0: self.log( f"Retrying download (attempt {self.max_file_retries - retries + 1}/{self.max_file_retries})..." ) time.sleep(self.api_delay_seconds) self.cleanup() return self.download(retries=retries - 1) self.cleanup() raise last_exception
def _get_file_size(self) -> int: """Fetches the total size of the file using a HEAD request.""" self.log("Fetching file size...") self._wait_for_api_call() start_time = time.time() response = requests.head( self.url, params=self.params, headers=self.headers, proxies=self.proxies, timeout=self.headers_timeout, verify=self.verify_ssl, ) response.raise_for_status() duration = time.time() - start_time self.log(f"Received headers in {duration:.2f} seconds.") cl_header = response.headers.get("Content-Length") try: content_length = int(cl_header) except (ValueError, TypeError): raise ValueError( f"[SegmentedFileDownloader][file={self.file_id}] Invalid or missing Content-Length header: {cl_header}." ) self.log(f"Content-Length: {content_length}") return content_length def _download_chunks_concurrently(self, chunks: range, total_size: int): """Manages the parallel download of all file chunks.""" with cf.ThreadPoolExecutor( max_workers=self.max_concurrent_downloads ) as executor: futures = [] for i, start in enumerate(chunks): # wait before next API call future = executor.submit( self._download_chunk, i, start, min(start + chunks.step - 1, total_size - 1), ) futures.append(future) try: for future in cf.as_completed(futures): if future.exception(): executor.shutdown(wait=False, cancel_futures=True) raise future.exception() except KeyboardInterrupt: executor.shutdown(wait=False, cancel_futures=True) raise def _download_chunk(self, part_num: int, start_byte: int, end_byte: int) -> None: """Starts the download process for a single file chunk.""" self._download_chunk_retry(part_num, start_byte, end_byte, retries=1) def _download_chunk_retry( self, part_num: int, start_byte: int, end_byte: int, retries: int ) -> None: """Downloads a specific byte range of the file with a retry mechanism.""" self.log(f"Downloading bytes [{start_byte}-{end_byte}]", part_num=part_num) segment_headers = self.headers.copy() segment_headers["Range"] = f"bytes={start_byte}-{end_byte}" part_path = self.temp_dir / f"part_{part_num}" try: self._wait_for_api_call() with requests.get( headers=segment_headers, url=self.url, params=self.params, proxies=self.proxies, stream=True, timeout=self.timeout, verify=self.verify_ssl, ) as response: response.raise_for_status() with open(part_path, "wb") as f: for chunk in response.iter_content(chunk_size=self.chunk_size): f.write(chunk) self.log("Finished download.", part_num=part_num) except KeyboardInterrupt: raise except Exception as e: if isinstance(e, requests.exceptions.HTTPError): if hasattr(e, "response") and hasattr(e.response, "status_code"): if 400 <= e.response.status_code < 500: retries = 0 raise e self.log( f"FAILED download. Error: {e}", part_num=part_num, level=logging.ERROR ) if retries > 0: self.log("Retrying download...", part_num=part_num) self._download_chunk_retry(part_num, start_byte, end_byte, retries - 1) else: raise def _assemble_parts(self, final_path: Path, num_parts: int): """Combines the downloaded chunks into a single final file.""" self.log(f"Assembling {num_parts} parts") with open(final_path, "wb") as final_file: for i in range(num_parts): part_path = self.temp_dir / f"part_{i}" with open(part_path, "rb") as part_file: shutil.copyfileobj(part_file, final_file) final_size = final_path.stat().st_size self.log(f"Assembled file size: {final_size / (1024 * 1024):.2f} MB") self.cleanup()
[docs] def cleanup(self): """Removes the temporary directory and all downloaded parts.""" if self.temp_dir.exists(): shutil.rmtree(self.temp_dir) self.log("Cleaned up temporary files.")