Skip to content
Merged

Dev #91

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,6 @@ uv build

A white paper is available in [paper/paper.md](paper/paper.md).

Build PDF with pandoc:

```bash
docker run --rm --platform linux/amd64 \
-v "$(pwd)/paper:/data" \
-w /data openjournals/inara:latest paper.md -p -o pdf
```

## Contributing

1. Fork the repository
Expand Down
190 changes: 126 additions & 64 deletions pridepy/files/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import urllib
import urllib.request
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from ftplib import FTP
from typing import Dict, List, Optional, Tuple
import socket
Expand Down Expand Up @@ -58,7 +59,7 @@ class Files:
API_PRIVATE_URL = "https://www.ebi.ac.uk/pride/private/ws/archive/v2"
PRIDE_ARCHIVE_FTP = "ftp.pride.ebi.ac.uk"
PRIDE_ARCHIVE_FTP_URL_PREFIX = "ftp://ftp.pride.ebi.ac.uk/"
GLOBUS_BASE_URL = "https://g-a8b222.dd271.03c0.data.globus.org/"
PRIDE_ARCHIVE_HTTPS_URL_PREFIX = "https://ftp.pride.ebi.ac.uk/"
S3_URL = "https://hh.fire.sdo.ebi.ac.uk"
S3_BUCKET = "pride-public"
PROTOCOL_ORDER = ["aspera", "s3", "ftp", "globus"]
Expand All @@ -68,57 +69,47 @@ def __init__(self):
pass

@staticmethod
def _parse_checksum_line(line: str) -> Optional[Tuple[str, str]]:
"""
Parse one checksum line and return (file_basename, md5_checksum) when present.
Supports common formats:
- <md5> <path>
- <path>\t<md5>
- <md5>\t<path>
"""
clean = line.strip()
if not clean or clean.startswith("#"):
return None

tokens = clean.replace("\t", " ").split()
if len(tokens) < 2:
return None

checksum = None
path_token = None
for idx, token in enumerate(tokens):
normalized = token.lower()
if len(normalized) == 32 and all(c in "0123456789abcdef" for c in normalized):
checksum = normalized
remaining = [t for i, t in enumerate(tokens) if i != idx]
if remaining:
path_token = remaining[-1]
break

if not checksum or not path_token:
def _find_tsv_columns(header: str) -> Optional[Tuple[int, int]]:
"""Return (name_idx, checksum_idx) from a TSV header, or None."""
cols = [col.strip().lower() for col in header.split("\t")]
required_cols = {"file-name", "file-md5checksum", "file-size"}
if not required_cols.issubset(set(cols)):
return None
return cols.index("file-name"), cols.index("file-md5checksum")

file_name = os.path.basename(path_token.lstrip("*"))
if not file_name:
return None
return file_name, checksum
@staticmethod
def _is_md5_checksum(value: str) -> bool:
return len(value) == 32 and all(char in "0123456789abcdef" for char in value)

@staticmethod
def read_checksum_file(checksum_file_path: str) -> Dict[str, str]:
"""
Read checksum TSV/TXT and build {file_name: md5} map.
Read PRIDE API checksum TSV and build {file_name: md5} map.
Expected format: File-Name\tFile-MD5Checksum\tFile-Size
"""
checksums: Dict[str, str] = {}
if not checksum_file_path or not os.path.exists(checksum_file_path):
return checksums

with open(checksum_file_path, "r", encoding="utf-8") as checksum_file:
for line in checksum_file:
parsed = Files._parse_checksum_line(line)
if parsed is None:
continue
file_name, checksum = parsed
checksums[file_name] = checksum
with open(checksum_file_path, "r", encoding="utf-8") as f:
header = f.readline().strip()
if not header:
return checksums

col_indices = Files._find_tsv_columns(header)
if col_indices is None:
logging.warning(f"Unrecognized checksum file format: {header}")
return checksums

name_idx, checksum_idx = col_indices
min_cols = max(name_idx, checksum_idx) + 1
for line in f:
parts = line.strip().split("\t")
if len(parts) >= min_cols:
fn = os.path.basename(parts[name_idx].strip())
cs = parts[checksum_idx].strip().lower()
if fn and Files._is_md5_checksum(cs):
checksums[fn] = cs

return checksums

Expand Down Expand Up @@ -198,7 +189,11 @@ def _get_download_url(file_record: Dict, protocol: str) -> str:
if protocol == "ftp":
return ftp_url
if protocol == "globus":
return ftp_url.replace(Files.PRIDE_ARCHIVE_FTP_URL_PREFIX, Files.GLOBUS_BASE_URL)
return ftp_url.replace(
Files.PRIDE_ARCHIVE_FTP_URL_PREFIX,
Files.PRIDE_ARCHIVE_HTTPS_URL_PREFIX,
1,
)
if protocol == "s3":
return ftp_url
raise ValueError(f"Unsupported protocol: {protocol}")
Expand Down Expand Up @@ -485,6 +480,93 @@ def download_files_from_aspera(
except subprocess.CalledProcessError as e:
logging.error(f"Aspera download failed for {new_file_path}: {str(e)}")

@staticmethod
def _download_range(url, file_path, start, end, pbar):
"""Download a byte range directly into the target file using seek."""
session = Util.create_session_with_retries()
headers = {"Range": f"bytes={start}-{end}"}
with session.get(url, headers=headers, stream=True, timeout=(30, 300)) as r:
r.raise_for_status()
if r.status_code != 206:
raise RuntimeError(f"Server did not honor Range request: {r.status_code}")
content_range = r.headers.get("Content-Range", "")
if not content_range.lower().startswith(f"bytes {start}-{end}/"):
raise RuntimeError(f"Unexpected Content-Range header: {content_range}")
with open(file_path, "r+b") as f:
f.seek(start)
for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
if chunk:
f.write(chunk)
pbar.update(len(chunk))

@staticmethod
def _parallel_download(url, file_path, num_connections=8):
"""Download a file using parallel Range requests, like browser parallel downloading."""
session = Util.create_session_with_retries()
try:
head = session.head(url, timeout=(30, 30))
head.raise_for_status()
total_size = int(head.headers.get("content-length", 0))
accept_ranges = head.headers.get("accept-ranges", "none").strip().lower()
except (requests.RequestException, ValueError) as exc:
logging.info(f"HEAD request failed, falling back to single connection: {exc}")
total_size = 0
accept_ranges = "none"

if total_size == 0 or accept_ranges != "bytes" or num_connections < 2:
logging.info(
"Server does not support Range requests, falling back to single connection"
)
with session.get(url, stream=True, timeout=(30, 300)) as r:
r.raise_for_status()
with tqdm(total=total_size, unit="B", unit_scale=True, desc=file_path) as pbar:
with open(file_path, "wb", buffering=8 * 1024 * 1024) as f:
for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
return

num_connections = min(num_connections, total_size)
chunk_size = (total_size + num_connections - 1) // num_connections
ranges = []
for start in range(0, total_size, chunk_size):
end = min(start + chunk_size - 1, total_size - 1)
ranges.append((start, end))

logging.info(
f"Parallel download: {num_connections} connections, "
f"{total_size / 1024 / 1024:.0f}MB total"
)

with open(file_path, "wb") as f:
f.seek(total_size - 1)
f.write(b"\0")

try:
with tqdm(total=total_size, unit="B", unit_scale=True, desc=file_path) as pbar:
with ThreadPoolExecutor(max_workers=num_connections) as executor:
futures = []
for start, end in ranges:
f = executor.submit(
Files._download_range, url, file_path, start, end, pbar
)
futures.append(f)

for f in as_completed(futures):
f.result()
except Exception as exc:
logging.info(f"Parallel download failed, retrying with single connection: {exc}")
Files._remove_if_exists(file_path)
with session.get(url, stream=True, timeout=(30, 300)) as r:
r.raise_for_status()
with tqdm(total=total_size, unit="B", unit_scale=True, desc=file_path) as pbar:
with open(file_path, "wb", buffering=8 * 1024 * 1024) as f:
for chunk in r.iter_content(chunk_size=8 * 1024 * 1024):
if chunk:
f.write(chunk)
pbar.update(len(chunk))

@staticmethod
def download_files_from_globus(
file_list_json: List[Dict], output_folder, skip_if_downloaded_already
Expand All @@ -501,15 +583,9 @@ def download_files_from_globus(

for file in file_list_json:
try:
if file["publicFileLocations"][0]["name"] == "FTP Protocol":
download_url = file["publicFileLocations"][0]["value"]
else:
download_url = file["publicFileLocations"][1]["value"]
download_url = Files._get_download_url(file, "globus")

logging.debug(f"Downloading from Globus: {download_url}")
download_url = download_url.replace(
Files.PRIDE_ARCHIVE_FTP_URL_PREFIX, Files.GLOBUS_BASE_URL
)

# Create a clean filename to save the downloaded file
new_file_path = Files.get_output_file_name(download_url, file, output_folder)
Expand All @@ -518,21 +594,7 @@ def download_files_from_globus(
logging.info("Skipping download as file already exists")
continue

# Get total file size for progress tracking
with urllib.request.urlopen(download_url) as response:
total_size = int(response.headers.get("Content-Length", 0))

# Initialize progress bar
progress = Progress(total_size, new_file_path)

# Download the file with progress bar
urllib.request.urlretrieve(
download_url,
new_file_path,
reporthook=lambda blocks, block_size, total_size: progress(block_size),
)

progress.close()
Files._parallel_download(download_url, new_file_path)
logging.info(f"Successfully downloaded {new_file_path}")

except Exception as e:
Expand Down
Loading
Loading