from typing import Dict, Optional
from pathlib import Path
import concurrent.futures as cf
import logging
import traceback as tb
import time
import uuid
import shutil
import requests
from .constants import (
DQ_FILE_API_DELAY_MARGIN,
DQ_FILE_API_DELAY_PARAM,
DQ_FILE_API_HEADERS_TIMEOUT,
DQ_FILE_API_SEGMENT_SIZE_MB,
DQ_FILE_API_STREAM_CHUNK_SIZE,
DQ_FILE_API_TIMEOUT,
)
from .common import RateLimitedRequester
logger = logging.getLogger(__name__)
[docs]class SegmentedFileDownloader:
"""
A utility class to manage the multi-part, concurrent download of a single large file.
"""
def __init__(
self,
filename: str,
url: str,
headers: Dict[str, str],
params: Dict[str, str],
proxies: Optional[Dict[str, str]] = None,
chunk_size: int = DQ_FILE_API_STREAM_CHUNK_SIZE,
segment_size_mb: float = DQ_FILE_API_SEGMENT_SIZE_MB,
timeout: float = DQ_FILE_API_TIMEOUT,
api_delay: float = DQ_FILE_API_DELAY_PARAM,
api_delay_margin: float = DQ_FILE_API_DELAY_MARGIN,
headers_timeout: float = DQ_FILE_API_HEADERS_TIMEOUT,
max_concurrent_downloads: int = None,
max_file_retries: int = 3,
verify_ssl: bool = True,
start_download: bool = False,
*,
parent_requester: RateLimitedRequester,
debug: bool = False,
):
"""Initializes the downloader with URL, headers, and download parameters."""
if parent_requester is None:
raise ValueError("`parent_requester` must be provided for rate limiting.")
self.parent_requester: RateLimitedRequester = parent_requester
self.api_delay_seconds = api_delay * api_delay_margin
self.filename = Path(filename)
self.url = url
self.headers = headers
self.params = params
if not set(["file-group-id", "file-datetime"]).issubset(params):
raise ValueError(
"Missing required parameters: 'file-group-id' and 'file-datetime'"
)
self.file_id = params["file-group-id"] + "_" + params["file-datetime"]
self.proxies = proxies
self.out_dir = Path(self.filename.parent)
self.out_dir.mkdir(parents=True, exist_ok=True)
self.chunk_size = chunk_size
self.segment_size_mb = segment_size_mb
self.timeout = timeout
self.headers_timeout = headers_timeout
self.max_concurrent_downloads = max_concurrent_downloads
self.max_file_retries = max_file_retries
self.verify_ssl = verify_ssl
self.debug = debug
self.temp_dir = self.out_dir / f"_tmp_{self.filename.name}_{uuid.uuid4().hex}"
if start_download:
try:
self.download()
except Exception:
self.cleanup()
raise
def __enter__(self):
"""Allows the downloader to be used as a context manager."""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Ensures cleanup of temporary files upon exiting the context."""
if exc_type is not None:
logger.error(tb.format_exc())
self.cleanup()
return False
def _wait_for_api_call(self) -> bool:
return self.parent_requester._wait_for_api_call()
[docs] def log(self, msg: str, part_num: int = None, level: int = logging.INFO):
"""Logs a message with downloader-specific context."""
part_info = f"[part={part_num}]" if part_num is not None else ""
logger.log(
level, f"[SegmentedFileDownloader][file={self.file_id}]{part_info} {msg}"
)
[docs] def download(self, retries: int = None) -> Path:
"""Orchestrates the entire file download process, including retries."""
last_exception = None
if retries is None:
retries = self.max_file_retries
try:
self.log("Starting segmented file download")
start_time = time.time()
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
self.temp_dir.mkdir(exist_ok=True, parents=True)
total_size = self._get_file_size()
self.log(f"File size: {total_size / (1024 * 1024):.2f} MB")
chunk_size = int(self.segment_size_mb * 1024 * 1024)
chunks = range(0, total_size, chunk_size)
self.log(f"Creating {len(chunks)} download tasks")
self._download_chunks_concurrently(chunks, total_size)
final_path = Path(self.filename).resolve()
self._assemble_parts(final_path, len(chunks))
duration = time.time() - start_time
self.log(f"Download complete in {duration:.2f} seconds.")
self.log(f"Saved to: {final_path}")
return final_path
except KeyboardInterrupt:
raise
except Exception as e:
last_exception = e
self.log(f"Download failed. Error: {e}", level=logging.ERROR)
if self.debug:
raise e
if retries > 0:
self.log(
f"Retrying download (attempt {self.max_file_retries - retries + 1}/{self.max_file_retries})..."
)
time.sleep(self.api_delay_seconds)
self.cleanup()
return self.download(retries=retries - 1)
self.cleanup()
raise last_exception
def _get_file_size(self) -> int:
"""Fetches the total size of the file using a HEAD request."""
self.log("Fetching file size...")
self._wait_for_api_call()
start_time = time.time()
response = requests.head(
self.url,
params=self.params,
headers=self.headers,
proxies=self.proxies,
timeout=self.headers_timeout,
verify=self.verify_ssl,
)
response.raise_for_status()
duration = time.time() - start_time
self.log(f"Received headers in {duration:.2f} seconds.")
cl_header = response.headers.get("Content-Length")
try:
content_length = int(cl_header)
except (ValueError, TypeError):
raise ValueError(
f"[SegmentedFileDownloader][file={self.file_id}] Invalid or missing Content-Length header: {cl_header}."
)
self.log(f"Content-Length: {content_length}")
return content_length
def _download_chunks_concurrently(self, chunks: range, total_size: int):
"""Manages the parallel download of all file chunks."""
with cf.ThreadPoolExecutor(
max_workers=self.max_concurrent_downloads
) as executor:
futures = []
for i, start in enumerate(chunks):
# wait before next API call
future = executor.submit(
self._download_chunk,
i,
start,
min(start + chunks.step - 1, total_size - 1),
)
futures.append(future)
try:
for future in cf.as_completed(futures):
if future.exception():
executor.shutdown(wait=False, cancel_futures=True)
raise future.exception()
except KeyboardInterrupt:
executor.shutdown(wait=False, cancel_futures=True)
raise
def _download_chunk(self, part_num: int, start_byte: int, end_byte: int) -> None:
"""Starts the download process for a single file chunk."""
self._download_chunk_retry(part_num, start_byte, end_byte, retries=1)
def _download_chunk_retry(
self, part_num: int, start_byte: int, end_byte: int, retries: int
) -> None:
"""Downloads a specific byte range of the file with a retry mechanism."""
self.log(f"Downloading bytes [{start_byte}-{end_byte}]", part_num=part_num)
segment_headers = self.headers.copy()
segment_headers["Range"] = f"bytes={start_byte}-{end_byte}"
part_path = self.temp_dir / f"part_{part_num}"
try:
self._wait_for_api_call()
with requests.get(
headers=segment_headers,
url=self.url,
params=self.params,
proxies=self.proxies,
stream=True,
timeout=self.timeout,
verify=self.verify_ssl,
) as response:
response.raise_for_status()
with open(part_path, "wb") as f:
for chunk in response.iter_content(chunk_size=self.chunk_size):
f.write(chunk)
self.log("Finished download.", part_num=part_num)
except KeyboardInterrupt:
raise
except Exception as e:
if isinstance(e, requests.exceptions.HTTPError):
if hasattr(e, "response") and hasattr(e.response, "status_code"):
if 400 <= e.response.status_code < 500:
retries = 0
raise e
self.log(
f"FAILED download. Error: {e}", part_num=part_num, level=logging.ERROR
)
if retries > 0:
self.log("Retrying download...", part_num=part_num)
self._download_chunk_retry(part_num, start_byte, end_byte, retries - 1)
else:
raise
def _assemble_parts(self, final_path: Path, num_parts: int):
"""Combines the downloaded chunks into a single final file."""
self.log(f"Assembling {num_parts} parts")
with open(final_path, "wb") as final_file:
for i in range(num_parts):
part_path = self.temp_dir / f"part_{i}"
with open(part_path, "rb") as part_file:
shutil.copyfileobj(part_file, final_file)
final_size = final_path.stat().st_size
self.log(f"Assembled file size: {final_size / (1024 * 1024):.2f} MB")
self.cleanup()
[docs] def cleanup(self):
"""Removes the temporary directory and all downloaded parts."""
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
self.log("Cleaned up temporary files.")