diff --git a/README.md b/README.md index 275ab15..f37bb75 100644 --- a/README.md +++ b/README.md @@ -222,14 +222,6 @@ uv build A white paper is available in [paper/paper.md](paper/paper.md). -Build PDF with pandoc: - -```bash -docker run --rm --platform linux/amd64 \ - -v "$(pwd)/paper:/data" \ - -w /data openjournals/inara:latest paper.md -p -o pdf -``` - ## Contributing 1. Fork the repository diff --git a/pridepy/files/files.py b/pridepy/files/files.py index 18f63bc..a329826 100644 --- a/pridepy/files/files.py +++ b/pridepy/files/files.py @@ -10,6 +10,7 @@ import urllib import urllib.request import time +from concurrent.futures import ThreadPoolExecutor, as_completed from ftplib import FTP from typing import Dict, List, Optional, Tuple import socket @@ -58,7 +59,7 @@ class Files: API_PRIVATE_URL = "https://www.ebi.ac.uk/pride/private/ws/archive/v2" PRIDE_ARCHIVE_FTP = "ftp.pride.ebi.ac.uk" PRIDE_ARCHIVE_FTP_URL_PREFIX = "ftp://ftp.pride.ebi.ac.uk/" - GLOBUS_BASE_URL = "https://g-a8b222.dd271.03c0.data.globus.org/" + PRIDE_ARCHIVE_HTTPS_URL_PREFIX = "https://ftp.pride.ebi.ac.uk/" S3_URL = "https://hh.fire.sdo.ebi.ac.uk" S3_BUCKET = "pride-public" PROTOCOL_ORDER = ["aspera", "s3", "ftp", "globus"] @@ -68,57 +69,47 @@ def __init__(self): pass @staticmethod - def _parse_checksum_line(line: str) -> Optional[Tuple[str, str]]: - """ - Parse one checksum line and return (file_basename, md5_checksum) when present. - Supports common formats: - - - - \t - - \t - """ - clean = line.strip() - if not clean or clean.startswith("#"): - return None - - tokens = clean.replace("\t", " ").split() - if len(tokens) < 2: - return None - - checksum = None - path_token = None - for idx, token in enumerate(tokens): - normalized = token.lower() - if len(normalized) == 32 and all(c in "0123456789abcdef" for c in normalized): - checksum = normalized - remaining = [t for i, t in enumerate(tokens) if i != idx] - if remaining: - path_token = remaining[-1] - break - - if not checksum or not path_token: + def _find_tsv_columns(header: str) -> Optional[Tuple[int, int]]: + """Return (name_idx, checksum_idx) from a TSV header, or None.""" + cols = [col.strip().lower() for col in header.split("\t")] + required_cols = {"file-name", "file-md5checksum", "file-size"} + if not required_cols.issubset(set(cols)): return None + return cols.index("file-name"), cols.index("file-md5checksum") - file_name = os.path.basename(path_token.lstrip("*")) - if not file_name: - return None - return file_name, checksum + @staticmethod + def _is_md5_checksum(value: str) -> bool: + return len(value) == 32 and all(char in "0123456789abcdef" for char in value) @staticmethod def read_checksum_file(checksum_file_path: str) -> Dict[str, str]: """ - Read checksum TSV/TXT and build {file_name: md5} map. + Read PRIDE API checksum TSV and build {file_name: md5} map. + Expected format: File-Name\tFile-MD5Checksum\tFile-Size """ checksums: Dict[str, str] = {} if not checksum_file_path or not os.path.exists(checksum_file_path): return checksums - with open(checksum_file_path, "r", encoding="utf-8") as checksum_file: - for line in checksum_file: - parsed = Files._parse_checksum_line(line) - if parsed is None: - continue - file_name, checksum = parsed - checksums[file_name] = checksum + with open(checksum_file_path, "r", encoding="utf-8") as f: + header = f.readline().strip() + if not header: + return checksums + + col_indices = Files._find_tsv_columns(header) + if col_indices is None: + logging.warning(f"Unrecognized checksum file format: {header}") + return checksums + + name_idx, checksum_idx = col_indices + min_cols = max(name_idx, checksum_idx) + 1 + for line in f: + parts = line.strip().split("\t") + if len(parts) >= min_cols: + fn = os.path.basename(parts[name_idx].strip()) + cs = parts[checksum_idx].strip().lower() + if fn and Files._is_md5_checksum(cs): + checksums[fn] = cs return checksums @@ -198,7 +189,11 @@ def _get_download_url(file_record: Dict, protocol: str) -> str: if protocol == "ftp": return ftp_url if protocol == "globus": - return ftp_url.replace(Files.PRIDE_ARCHIVE_FTP_URL_PREFIX, Files.GLOBUS_BASE_URL) + return ftp_url.replace( + Files.PRIDE_ARCHIVE_FTP_URL_PREFIX, + Files.PRIDE_ARCHIVE_HTTPS_URL_PREFIX, + 1, + ) if protocol == "s3": return ftp_url raise ValueError(f"Unsupported protocol: {protocol}") @@ -485,6 +480,93 @@ def download_files_from_aspera( except subprocess.CalledProcessError as e: logging.error(f"Aspera download failed for {new_file_path}: {str(e)}") + @staticmethod + def _download_range(url, file_path, start, end, pbar): + """Download a byte range directly into the target file using seek.""" + session = Util.create_session_with_retries() + headers = {"Range": f"bytes={start}-{end}"} + with session.get(url, headers=headers, stream=True, timeout=(30, 300)) as r: + r.raise_for_status() + if r.status_code != 206: + raise RuntimeError(f"Server did not honor Range request: {r.status_code}") + content_range = r.headers.get("Content-Range", "") + if not content_range.lower().startswith(f"bytes {start}-{end}/"): + raise RuntimeError(f"Unexpected Content-Range header: {content_range}") + with open(file_path, "r+b") as f: + f.seek(start) + for chunk in r.iter_content(chunk_size=8 * 1024 * 1024): + if chunk: + f.write(chunk) + pbar.update(len(chunk)) + + @staticmethod + def _parallel_download(url, file_path, num_connections=8): + """Download a file using parallel Range requests, like browser parallel downloading.""" + session = Util.create_session_with_retries() + try: + head = session.head(url, timeout=(30, 30)) + head.raise_for_status() + total_size = int(head.headers.get("content-length", 0)) + accept_ranges = head.headers.get("accept-ranges", "none").strip().lower() + except (requests.RequestException, ValueError) as exc: + logging.info(f"HEAD request failed, falling back to single connection: {exc}") + total_size = 0 + accept_ranges = "none" + + if total_size == 0 or accept_ranges != "bytes" or num_connections < 2: + logging.info( + "Server does not support Range requests, falling back to single connection" + ) + with session.get(url, stream=True, timeout=(30, 300)) as r: + r.raise_for_status() + with tqdm(total=total_size, unit="B", unit_scale=True, desc=file_path) as pbar: + with open(file_path, "wb", buffering=8 * 1024 * 1024) as f: + for chunk in r.iter_content(chunk_size=8 * 1024 * 1024): + if chunk: + f.write(chunk) + pbar.update(len(chunk)) + return + + num_connections = min(num_connections, total_size) + chunk_size = (total_size + num_connections - 1) // num_connections + ranges = [] + for start in range(0, total_size, chunk_size): + end = min(start + chunk_size - 1, total_size - 1) + ranges.append((start, end)) + + logging.info( + f"Parallel download: {num_connections} connections, " + f"{total_size / 1024 / 1024:.0f}MB total" + ) + + with open(file_path, "wb") as f: + f.seek(total_size - 1) + f.write(b"\0") + + try: + with tqdm(total=total_size, unit="B", unit_scale=True, desc=file_path) as pbar: + with ThreadPoolExecutor(max_workers=num_connections) as executor: + futures = [] + for start, end in ranges: + f = executor.submit( + Files._download_range, url, file_path, start, end, pbar + ) + futures.append(f) + + for f in as_completed(futures): + f.result() + except Exception as exc: + logging.info(f"Parallel download failed, retrying with single connection: {exc}") + Files._remove_if_exists(file_path) + with session.get(url, stream=True, timeout=(30, 300)) as r: + r.raise_for_status() + with tqdm(total=total_size, unit="B", unit_scale=True, desc=file_path) as pbar: + with open(file_path, "wb", buffering=8 * 1024 * 1024) as f: + for chunk in r.iter_content(chunk_size=8 * 1024 * 1024): + if chunk: + f.write(chunk) + pbar.update(len(chunk)) + @staticmethod def download_files_from_globus( file_list_json: List[Dict], output_folder, skip_if_downloaded_already @@ -501,15 +583,9 @@ def download_files_from_globus( for file in file_list_json: try: - if file["publicFileLocations"][0]["name"] == "FTP Protocol": - download_url = file["publicFileLocations"][0]["value"] - else: - download_url = file["publicFileLocations"][1]["value"] + download_url = Files._get_download_url(file, "globus") logging.debug(f"Downloading from Globus: {download_url}") - download_url = download_url.replace( - Files.PRIDE_ARCHIVE_FTP_URL_PREFIX, Files.GLOBUS_BASE_URL - ) # Create a clean filename to save the downloaded file new_file_path = Files.get_output_file_name(download_url, file, output_folder) @@ -518,21 +594,7 @@ def download_files_from_globus( logging.info("Skipping download as file already exists") continue - # Get total file size for progress tracking - with urllib.request.urlopen(download_url) as response: - total_size = int(response.headers.get("Content-Length", 0)) - - # Initialize progress bar - progress = Progress(total_size, new_file_path) - - # Download the file with progress bar - urllib.request.urlretrieve( - download_url, - new_file_path, - reporthook=lambda blocks, block_size, total_size: progress(block_size), - ) - - progress.close() + Files._parallel_download(download_url, new_file_path) logging.info(f"Successfully downloaded {new_file_path}") except Exception as e: diff --git a/pridepy/tests/test_download_resilience.py b/pridepy/tests/test_download_resilience.py index 46bbcd5..a5efae1 100644 --- a/pridepy/tests/test_download_resilience.py +++ b/pridepy/tests/test_download_resilience.py @@ -2,25 +2,138 @@ import os import tempfile from unittest import TestCase -from unittest.mock import patch +from unittest.mock import Mock, patch from pridepy.files.files import Files class TestDownloadResilience(TestCase): - def test_read_checksum_file_parses_common_formats(self): + def test_read_checksum_file_parses_pride_api_format(self): with tempfile.TemporaryDirectory() as tmp_dir: checksum_path = os.path.join(tmp_dir, "checksums.tsv") with open(checksum_path, "w", encoding="utf-8") as handle: - handle.write("900150983cd24fb0d6963f7d28e17f72 fileA.raw\n") - handle.write("fileB.raw\t900150983cd24fb0d6963f7d28e17f72\n") - handle.write("900150983cd24fb0d6963f7d28e17f72\t/path/to/fileC.raw\n") + handle.write("File-Name\tFile-MD5Checksum\tFile-Size\n") + handle.write("fileA.raw\t900150983cd24fb0d6963f7d28e17f72\t1024\n") + handle.write("fileB.raw\td41d8cd98f00b204e9800998ecf8427e\t2048\n") + handle.write("fileC.raw\tnot-a-md5\t4096\n") checksum_map = Files.read_checksum_file(checksum_path) assert checksum_map["fileA.raw"] == "900150983cd24fb0d6963f7d28e17f72" - assert checksum_map["fileB.raw"] == "900150983cd24fb0d6963f7d28e17f72" - assert checksum_map["fileC.raw"] == "900150983cd24fb0d6963f7d28e17f72" + assert checksum_map["fileB.raw"] == "d41d8cd98f00b204e9800998ecf8427e" + assert "fileC.raw" not in checksum_map + + def test_read_checksum_file_returns_empty_on_bad_header(self): + with tempfile.TemporaryDirectory() as tmp_dir: + checksum_path = os.path.join(tmp_dir, "checksums.tsv") + with open(checksum_path, "w", encoding="utf-8") as handle: + handle.write("random header\n") + handle.write("some data\n") + + checksum_map = Files.read_checksum_file(checksum_path) + assert len(checksum_map) == 0 + + def test_get_download_url_maps_globus_to_pride_archive_https(self): + file_record = { + "publicFileLocations": [ + {"name": "FTP Protocol", "value": "ftp://ftp.pride.ebi.ac.uk/path/file.raw"} + ] + } + + download_url = Files._get_download_url(file_record, "globus") + + assert download_url == "https://ftp.pride.ebi.ac.uk/path/file.raw" + + def test_parallel_download_falls_back_when_range_not_honored(self): + with tempfile.TemporaryDirectory() as tmp_dir: + output_file = os.path.join(tmp_dir, "file.raw") + session = Mock() + head = Mock() + head.headers = {"content-length": "1", "accept-ranges": "bytes"} + head.raise_for_status.return_value = None + session.head.return_value = head + + ranged_response = Mock() + ranged_response.status_code = 200 + ranged_response.headers = {} + ranged_response.raise_for_status.return_value = None + ranged_response.__enter__ = Mock(return_value=ranged_response) + ranged_response.__exit__ = Mock(return_value=None) + + fallback_response = Mock() + fallback_response.raise_for_status.return_value = None + fallback_response.iter_content.return_value = [b"abc"] + fallback_response.__enter__ = Mock(return_value=fallback_response) + fallback_response.__exit__ = Mock(return_value=None) + session.get.side_effect = [ranged_response, fallback_response] + + with patch( + "pridepy.files.files.Util.create_session_with_retries", + return_value=session, + ): + Files._parallel_download( + "https://example.org/file.raw", + output_file, + num_connections=2, + ) + + with open(output_file, "rb") as handle: + assert handle.read() == b"abc" + + def test_parallel_download_falls_back_when_head_fails(self): + with tempfile.TemporaryDirectory() as tmp_dir: + output_file = os.path.join(tmp_dir, "file.raw") + session = Mock() + session.head.side_effect = ValueError("bad content length") + + fallback_response = Mock() + fallback_response.raise_for_status.return_value = None + fallback_response.iter_content.return_value = [b"abc"] + fallback_response.__enter__ = Mock(return_value=fallback_response) + fallback_response.__exit__ = Mock(return_value=None) + session.get.return_value = fallback_response + + with patch( + "pridepy.files.files.Util.create_session_with_retries", + return_value=session, + ): + Files._parallel_download( + "https://example.org/file.raw", + output_file, + num_connections=2, + ) + + with open(output_file, "rb") as handle: + assert handle.read() == b"abc" + + def test_parallel_download_falls_back_without_accept_ranges(self): + with tempfile.TemporaryDirectory() as tmp_dir: + output_file = os.path.join(tmp_dir, "file.raw") + session = Mock() + head = Mock() + head.headers = {"content-length": "3", "accept-ranges": "none"} + head.raise_for_status.return_value = None + session.head.return_value = head + + fallback_response = Mock() + fallback_response.raise_for_status.return_value = None + fallback_response.iter_content.return_value = [b"abc"] + fallback_response.__enter__ = Mock(return_value=fallback_response) + fallback_response.__exit__ = Mock(return_value=None) + session.get.return_value = fallback_response + + with patch( + "pridepy.files.files.Util.create_session_with_retries", + return_value=session, + ): + Files._parallel_download( + "https://example.org/file.raw", + output_file, + num_connections=2, + ) + + with open(output_file, "rb") as handle: + assert handle.read() == b"abc" def test_validate_download_rejects_empty_and_bad_checksum(self): with tempfile.TemporaryDirectory() as tmp_dir: diff --git a/pyproject.toml b/pyproject.toml index 080ecd8..64bd396 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "pridepy" -version = "0.0.13" +version = "0.0.14" description = "Python Client library for PRIDE Rest API" readme = "README.md" requires-python = ">=3.9"