from __future__ import annotations import time import requests from requests.exceptions import HTTPError, RequestException, Timeout from tcga_downloader.logger import get_logger GDC_FILES_URL = "https://api.gdc.cancer.gov/files" logger = get_logger("query") def build_filters( project: str, data_type: str, sample_type: str | None = None, platform: str | None = None, ) -> dict: filters = [ { "op": "in", "content": { "field": "cases.project.project_id", "value": [project], }, }, { "op": "in", "content": { "field": "data_type", "value": [data_type], }, }, ] if sample_type: filters.append( { "op": "in", "content": { "field": "samples.sample_type", "value": [sample_type], }, } ) if platform: filters.append( { "op": "in", "content": { "field": "platform", "value": [platform], }, } ) return { "op": "and", "content": filters, } def query_files( project: str, data_type: str, fields: list[str] | None = None, size: int = 1000, max_retries: int = 3, retry_delay: float = 1.0, sample_type: str | None = None, platform: str | None = None, ) -> list[dict]: if fields is None: fields = ["file_id", "file_name", "data_type", "data_format", "file_size", "md5sum"] payload = { "filters": build_filters(project, data_type, sample_type, platform), "fields": ",".join(fields), "format": "JSON", "size": size, } logger.info( "Querying GDC: project=%s, data_type=%s, sample_type=%s, platform=%s", project, data_type, sample_type, platform, ) last_error: RequestException | None = None for attempt in range(max_retries): try: resp = requests.post(GDC_FILES_URL, json=payload, timeout=30) resp.raise_for_status() data: dict = resp.json() hits = data.get("data", {}).get("hits", []) assert isinstance(hits, list) logger.info("Found %d files", len(hits)) return hits except Timeout as e: last_error = e logger.warning("Timeout on attempt %d/%d", attempt + 1, max_retries) except HTTPError as e: last_error = e logger.warning("HTTP error on attempt %d/%d: %s", attempt + 1, max_retries, e) except RequestException as e: last_error = e logger.warning("Request error on attempt %d/%d: %s", attempt + 1, max_retries, e) if attempt < max_retries - 1: sleep_time = retry_delay * (2**attempt) logger.debug("Retrying in %.1f seconds...", sleep_time) time.sleep(sleep_time) raise RuntimeError(f"Failed to query GDC after {max_retries} attempts: {last_error}") def query_multiple_projects( projects: list[str], data_type: str, fields: list[str] | None = None, size: int = 1000, max_retries: int = 3, retry_delay: float = 1.0, sample_type: str | None = None, platform: str | None = None, ) -> list[dict]: """Query multiple TCGA projects and return combined results.""" all_hits = [] for i, project in enumerate(projects): logger.info("Processing project %d/%d: %s", i + 1, len(projects), project) hits = query_files( project, data_type, fields, size, max_retries, retry_delay, sample_type, platform, ) all_hits.extend(hits) logger.info("Total files found across %d projects: %d", len(projects), len(all_hits)) return all_hits