Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pridepy/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Cross-cutting download commands.

Each module under this package owns one user-facing command that doesn't
fit any single provider:

- ``by_url``: download a list of explicit URLs (ftp/http/https)
- ``by_list``: download a subset of a project's files by filename
- ``proteomexchange``: download raw files from a ProteomeXchange XML

The ``pridepy.files.files.Files`` facade keeps shim methods that
delegate here, so existing test patches on ``Files.X`` keep working.
"""
58 changes: 58 additions & 0 deletions pridepy/commands/by_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Download a subset of project files identified by a filename list."""
import logging
from typing import List, Optional


def download_files_by_list(
accession: str,
file_names: List[str],
output_folder: str,
skip_if_downloaded_already: bool,
protocol: str = "ftp",
aspera_maximum_bandwidth: str = "100M",
checksum_check: bool = False,
parallel_files: int = 1,
) -> None:
"""Download a subset of project files identified by a filename list.

Resolves each requested filename via the project metadata API and
delegates to the provider's ``download_files`` so the existing batch +
protocol fallback engine is reused.

:param accession: PRIDE or MassIVE project accession (public)
:param file_names: filenames to download
:param output_folder: directory to write downloaded files into
:param skip_if_downloaded_already: skip files already present locally
:param protocol: preferred protocol; falls back across others on failure
:param aspera_maximum_bandwidth: aspera ascp bandwidth cap
:param checksum_check: download project checksums and validate
:param parallel_files: number of files to download simultaneously for globus
:raises ValueError: if ``file_names`` is empty or none match the project
"""
if not file_names:
raise ValueError("file_names must contain at least one filename")

from pridepy.providers import registry # lazy
provider = registry.resolve(accession)
all_files = provider.list_files(accession)

requested = set(file_names)
matched = [f for f in all_files if f.get("fileName") in requested]
missing = sorted(requested - {f.get("fileName") for f in matched})
if missing:
logging.warning("Files not found in project %s: %s", accession, missing)
if not matched:
raise ValueError(
f"No matching files in project {accession} for: {sorted(requested)}"
)

provider.download_files(
accession=accession,
records=matched,
output_folder=output_folder,
skip_if_downloaded_already=skip_if_downloaded_already,
protocol=protocol,
parallel_files=parallel_files,
checksum_check=checksum_check,
aspera_maximum_bandwidth=aspera_maximum_bandwidth,
)
254 changes: 254 additions & 0 deletions pridepy/commands/by_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
"""Download a list of explicit URLs (ftp/http/https).

Each URL is dispatched to the matching transport based on its scheme.
PRIDE checksum validation is supported when the accession can be
inferred from the URL path.
"""
import ftplib
import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from ftplib import FTP
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlparse

from tqdm import tqdm

from pridepy.util.api_handling import Util


def _extract_pride_accession(url: str) -> Optional[str]:
"""Extract a PRIDE accession (PXD/PRD followed by digits) from a URL path.

PRIDE archive URLs follow the pattern
``…/pride/data/archive/YYYY/MM/<ACCESSION>/filename``.
Returns ``None`` when no accession can be identified.
"""
match = re.search(r"((?:PXD|PRD)\d{4,})", url)
return match.group(1) if match else None


def _validate_urls_checksums(urls: List[str], output_folder: str) -> None:
"""Validate downloaded files against PRIDE checksum API.

Accessions are inferred from URL paths via
:func:`_extract_pride_accession`. URLs that do not contain a
recognisable PRIDE accession are skipped with a warning.

:raises RuntimeError: if one or more files fail validation
"""
from pridepy.files.files import Files

accession_urls: Dict[str, List[str]] = {}
for url in urls:
acc = _extract_pride_accession(url)
if acc:
accession_urls.setdefault(acc, []).append(url)
else:
logging.warning(
"Cannot infer PRIDE accession from URL, skipping checksum: %s", url
)

validation_failures: List[str] = []
for acc, acc_urls in accession_urls.items():
checksum_file_path = Files.save_checksum_file(acc, output_folder)
checksum_map = Files.read_checksum_file(checksum_file_path)
logging.info(
"Loaded checksums for %d files (project %s)",
len(checksum_map), acc,
)
for url in acc_urls:
file_name = os.path.basename(urlparse(url).path)
target = os.path.join(output_folder, file_name)
expected = checksum_map.get(file_name)
logging.info("Validating %s", file_name)
valid, reason = Files.validate_download(target, expected)
if not valid:
logging.error("Validation failed for %s: %s", file_name, reason)
validation_failures.append(f"{file_name} ({reason})")
else:
logging.info("Checksum OK: %s", file_name)

if validation_failures:
raise RuntimeError(
f"Checksum validation failed for {len(validation_failures)} file(s): "
+ ", ".join(validation_failures)
)


def _http_download_url(url: str, target: str) -> None:
"""Stream an http/https URL into ``target`` with a progress bar."""
session = Util.create_session_with_retries()
with session.get(url, stream=True, timeout=60) as response:
response.raise_for_status()
total = int(response.headers.get("Content-Length", 0))
with open(target, "wb") as out, tqdm(
total=total,
unit="B",
unit_scale=True,
desc=os.path.basename(target),
) as pbar:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
out.write(chunk)
pbar.update(len(chunk))


def _ftp_download_url(parsed, target: str) -> None:
"""Download a single file from an ftp:// URL with a progress bar."""
host = parsed.hostname
if not host:
raise ValueError(f"FTP URL missing host: {parsed.geturl()}")
port = parsed.port or 21
user = parsed.username or "anonymous"
pwd = parsed.password or "anonymous@"
remote_path = parsed.path
with FTP() as ftp:
ftp.connect(host, port, timeout=60)
ftp.login(user, pwd)
try:
total = ftp.size(remote_path) or 0
except ftplib.error_perm:
total = 0
with open(target, "wb") as out, tqdm(
total=total,
unit="B",
unit_scale=True,
desc=os.path.basename(target),
) as pbar:

def _callback(data: bytes) -> None:
out.write(data)
pbar.update(len(data))

ftp.retrbinary(f"RETR {remote_path}", _callback)


def _dispatch_url_scheme(parsed, target: str, protocol: str = "ftp", position: int = 0) -> None:
"""Route a parsed URL to its protocol-specific downloader.

``protocol='globus'`` swaps the http/https single-connection streamer
for :func:`pridepy.files.files.Files._parallel_download` (single-connection with progress bar).
ftp:// URLs are unaffected.
"""
from pridepy.files.files import Files

scheme = (parsed.scheme or "").lower()
if scheme in ("http", "https"):
if protocol == "globus":
Files._parallel_download(parsed.geturl(), target, position=position)
else:
Files._http_download_url(parsed.geturl(), target)
elif scheme == "ftp":
Files._ftp_download_url(parsed, target)
else:
raise ValueError(f"Unsupported URL scheme: {scheme}")


def _download_single_url(
url: str,
output_folder: str,
skip_if_exists: bool = False,
protocol: str = "ftp",
position: int = 0,
) -> str:
"""Download one URL, dispatched by scheme; return the local file path."""
from pridepy.files.files import Files

parsed = urlparse(url)
if not (parsed.scheme or "").lower():
raise ValueError(f"URL missing scheme: {url}")

file_name = os.path.basename(parsed.path)
if not file_name:
raise ValueError(f"Cannot derive filename from URL: {url}")

target = os.path.join(output_folder, file_name)
if skip_if_exists and os.path.isfile(target) and os.path.getsize(target) > 0:
logging.info("Skipping %s: already downloaded", file_name)
return target

Files._dispatch_url_scheme(parsed, target, protocol, position=position)

ok, reason = Files.validate_download(target)
if not ok:
Files._remove_if_exists(target)
raise RuntimeError(f"Download invalid: {reason} ({target})")
return target


def download_files_by_url(
urls: List[str],
output_folder: str,
skip_if_downloaded_already: bool = False,
protocol: str = "ftp",
parallel_files: int = 1,
checksum_check: bool = False,
) -> None:
"""Download files from a list of raw URLs, dispatched by URL scheme.

Supported schemes: ``http``, ``https``, ``ftp``. Each URL is downloaded
independently; per-URL errors are logged, then aggregated and re-raised
as a single :class:`RuntimeError` so callers see a complete failure
summary.

:param urls: fully-qualified URLs (each contains its scheme)
:param output_folder: directory to write downloaded files into
:param skip_if_downloaded_already: skip URLs whose target file exists
:param protocol: ``ftp`` (default) for single-connection per URL scheme;
``globus`` for resume-capable http/https downloads (single-connection stream)
(no effect on ftp:// URLs which always use single-connection FTP)
:param checksum_check: validate downloads against PRIDE checksum API;
accessions are inferred from URL paths (only PRIDE URLs supported)
:raises ValueError: if ``urls`` is empty
:raises RuntimeError: if one or more URLs failed
"""
if not urls:
raise ValueError("urls must contain at least one URL")

os.makedirs(output_folder, exist_ok=True)

parallel_files = min(parallel_files, 3, len(urls))
failures: List[Tuple[str, str]] = []
from pridepy.files.files import Files

if parallel_files < 2:
for url in urls:
try:
Files._download_single_url(
url, output_folder, skip_if_downloaded_already, protocol,
)
except Exception as exc: # pylint: disable=broad-except
logging.error("Failed to download %s: %s", url, exc)
failures.append((url, str(exc)))
else:
logging.info(
"Downloading %d URL(s) with %d parallel workers",
len(urls), parallel_files,
)
with ThreadPoolExecutor(max_workers=parallel_files) as executor:
futures = {
executor.submit(
Files._download_single_url,
url, output_folder, skip_if_downloaded_already, protocol,
position=idx,
): url
for idx, url in enumerate(urls)
}
for future in as_completed(futures):
url = futures[future]
try:
future.result()
except Exception as exc: # pylint: disable=broad-except
logging.error("Failed to download %s: %s", url, exc)
failures.append((url, str(exc)))

if failures:
summary = ", ".join(f"{u} ({e})" for u, e in failures)
raise RuntimeError(
f"Failed to download {len(failures)} URL(s): {summary}"
)

if checksum_check:
_validate_urls_checksums(urls, output_folder)
Loading
Loading