from __future__ import annotations import csv import json from collections.abc import Iterable from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: pass REQUIRED_FIELDS = ["file_id", "file_name", "data_type", "data_format", "size", "md5"] @dataclass(frozen=True) class ManifestRecord: file_id: str file_name: str data_type: str data_format: str size: int md5: str def _validate_record(rec: ManifestRecord) -> None: if not rec.file_id or not rec.file_name: raise ValueError("file_id and file_name are required") if rec.size < 0: raise ValueError("size must be non-negative") def write_manifest(records: Iterable[ManifestRecord], path: Path, fmt: str = "tsv") -> None: path = Path(path) if fmt not in {"tsv", "json"}: raise ValueError("fmt must be 'tsv' or 'json'") records = list(records) for rec in records: _validate_record(rec) if fmt == "json": data = [rec.__dict__ for rec in records] path.write_text(json.dumps(data, indent=2)) return gdc_fields = ["id", "filename", "md5", "size", "state"] with path.open("w", newline="") as f: writer = csv.DictWriter(f, fieldnames=gdc_fields, delimiter="\t") writer.writeheader() for rec in records: row = { "id": rec.file_id, "filename": rec.file_name, "md5": rec.md5, "size": rec.size, "state": "live", # Default state for gdc-client manifest } writer.writerow(row) def load_manifest(path: Path) -> list[ManifestRecord]: path = Path(path) if path.suffix.lower() == ".json": data = json.loads(path.read_text()) return [ManifestRecord(**row) for row in data] with path.open("r", newline="") as f: reader = csv.DictReader(f, delimiter="\t") records: list[ManifestRecord] = [] for row in reader: # Detect manifest format: gdc-client format uses 'id'/'filename', old format uses 'file_id'/'file_name' if "id" in row and "filename" in row: file_id = row["id"] file_name = row["filename"] elif "file_id" in row and "file_name" in row: file_id = row["file_id"] file_name = row["file_name"] else: raise ValueError("Invalid manifest: missing required columns") records.append( ManifestRecord( file_id=file_id, file_name=file_name, data_type=row.get("data_type", "Unknown"), data_format=row.get("data_format", "Unknown"), size=int(row["size"]), md5=row["md5"], ) ) return records def get_manifest_stats( records: Iterable[ManifestRecord], ) -> dict[str, int | float | dict[str, int]]: records = list(records) total_size = sum(rec.size for rec in records) data_types: dict[str, int] = {} data_formats: dict[str, int] = {} for rec in records: data_types[rec.data_type] = data_types.get(rec.data_type, 0) + 1 data_formats[rec.data_format] = data_formats.get(rec.data_format, 0) + 1 return { "file_count": len(records), "total_size_bytes": total_size, "total_size_mb": total_size / (1024 * 1024), "total_size_gb": total_size / (1024 * 1024 * 1024), "data_types": data_types, "data_formats": data_formats, } def format_manifest_stats(stats: dict[str, int | float | dict[str, int]]) -> str: lines = [ f"File count: {stats['file_count']}", f"Total size: {stats['total_size_gb']:.2f} GB ({stats['total_size_mb']:.2f} MB)", ] data_types = stats.get("data_types") if data_types and isinstance(data_types, dict): lines.append("\nData types:") for data_type, count in sorted(data_types.items()): lines.append(f" - {data_type}: {count}") data_formats = stats.get("data_formats") if data_formats and isinstance(data_formats, dict): lines.append("\nData formats:") for data_format, count in sorted(data_formats.items()): lines.append(f" - {data_format}: {count}") return "\n".join(lines) def validate_manifest(records: list[ManifestRecord]) -> list[str]: """Validate manifest records and return list of error messages.""" errors = [] for i, rec in enumerate(records, 1): if not rec.file_id: errors.append(f"Record {i}: Missing file_id") if not rec.file_name: errors.append(f"Record {i}: Missing file_name") if not rec.data_type: errors.append(f"Record {i}: Missing data_type") if not rec.data_format: errors.append(f"Record {i}: Missing data_format") if rec.size < 0: errors.append(f"Record {i}: Invalid size (must be non-negative)") if not rec.md5 or len(rec.md5) != 32: errors.append(f"Record {i}: Invalid MD5 checksum (must be 32 characters)") return errors def validate_files_against_manifest( records: list[ManifestRecord], data_dir: Path ) -> tuple[list[str], list[str]]: """Validate that downloaded files match manifest records. Returns: Tuple of (missing_files, checksum_errors) """ missing_files = [] checksum_errors = [] for rec in records: file_path = data_dir / rec.file_name if not file_path.exists(): missing_files.append(rec.file_name) continue if rec.md5: actual_md5 = _calculate_md5(file_path) if actual_md5 != rec.md5: checksum_errors.append(f"{rec.file_name}: Expected {rec.md5}, got {actual_md5}") return missing_files, checksum_errors def _calculate_md5(file_path: Path) -> str: import hashlib hash_md5 = hashlib.md5() with file_path.open("rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest()