tcga-downloader/tcga_downloader/query.py
yunpeng.zhang a01a59b371
Some checks failed
CI / Lint (push) Failing after 9m32s
CI / Test (3.11) (push) Successful in 6m41s
CI / Test (3.12) (push) Successful in 4m21s
feat: add interactive cli
2026-02-09 13:13:39 +08:00

147 lines
4.0 KiB
Python

from __future__ import annotations
import time
import requests
from requests.exceptions import HTTPError, RequestException, Timeout
from tcga_downloader.logger import get_logger
GDC_FILES_URL = "https://api.gdc.cancer.gov/files"
logger = get_logger("query")
def build_filters(
project: str,
data_type: str,
sample_type: str | None = None,
platform: str | None = None,
) -> dict:
filters = [
{
"op": "in",
"content": {
"field": "cases.project.project_id",
"value": [project],
},
},
{
"op": "in",
"content": {
"field": "data_type",
"value": [data_type],
},
},
]
if sample_type:
filters.append(
{
"op": "in",
"content": {
"field": "samples.sample_type",
"value": [sample_type],
},
}
)
if platform:
filters.append(
{
"op": "in",
"content": {
"field": "platform",
"value": [platform],
},
}
)
return {
"op": "and",
"content": filters,
}
def query_files(
project: str,
data_type: str,
fields: list[str] | None = None,
size: int = 1000,
max_retries: int = 3,
retry_delay: float = 1.0,
sample_type: str | None = None,
platform: str | None = None,
) -> list[dict]:
if fields is None:
fields = ["file_id", "file_name", "data_type", "data_format", "file_size", "md5sum"]
payload = {
"filters": build_filters(project, data_type, sample_type, platform),
"fields": ",".join(fields),
"format": "JSON",
"size": size,
}
logger.info(
"Querying GDC: project=%s, data_type=%s, sample_type=%s, platform=%s",
project,
data_type,
sample_type,
platform,
)
last_error: RequestException | None = None
for attempt in range(max_retries):
try:
resp = requests.post(GDC_FILES_URL, json=payload, timeout=30)
resp.raise_for_status()
data: dict = resp.json()
hits = data.get("data", {}).get("hits", [])
assert isinstance(hits, list)
logger.info("Found %d files", len(hits))
return hits
except Timeout as e:
last_error = e
logger.warning("Timeout on attempt %d/%d", attempt + 1, max_retries)
except HTTPError as e:
last_error = e
logger.warning("HTTP error on attempt %d/%d: %s", attempt + 1, max_retries, e)
except RequestException as e:
last_error = e
logger.warning("Request error on attempt %d/%d: %s", attempt + 1, max_retries, e)
if attempt < max_retries - 1:
sleep_time = retry_delay * (2**attempt)
logger.debug("Retrying in %.1f seconds...", sleep_time)
time.sleep(sleep_time)
raise RuntimeError(f"Failed to query GDC after {max_retries} attempts: {last_error}")
def query_multiple_projects(
projects: list[str],
data_type: str,
fields: list[str] | None = None,
size: int = 1000,
max_retries: int = 3,
retry_delay: float = 1.0,
sample_type: str | None = None,
platform: str | None = None,
) -> list[dict]:
"""Query multiple TCGA projects and return combined results."""
all_hits = []
for i, project in enumerate(projects):
logger.info("Processing project %d/%d: %s", i + 1, len(projects), project)
hits = query_files(
project,
data_type,
fields,
size,
max_retries,
retry_delay,
sample_type,
platform,
)
all_hits.extend(hits)
logger.info("Total files found across %d projects: %d", len(projects), len(all_hits))
return all_hits