Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 177 additions & 172 deletions README.md

Large diffs are not rendered by default.

57 changes: 40 additions & 17 deletions krakenparser/counts/convert2csv.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
#!/usr/bin/env python

import argparse
import logging
import sys
from pathlib import Path
from typing import Optional

import pandas as pd
import typer

from krakenparser.utils import ensure_output_dir

_log = logging.getLogger(__name__)

app = typer.Typer(
name="csv",
add_completion=False,
context_settings={"help_option_names": ["-h", "--help"]},
)


def convert_to_csv(input_file, output_file):
def convert_to_csv(input_file: str, output_file: str) -> None:
in_path = Path(input_file)
if not in_path.is_file():
raise FileNotFoundError(f"Input file not found: {in_path}")
Expand All @@ -22,26 +29,42 @@ def convert_to_csv(input_file, output_file):
_log.info("Data converted and saved as '%s'.", output_file)


def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(message)s")
parser = argparse.ArgumentParser(
description="Reads a TXT file, reorganizes the data, and converts it into a CSV file."
)
parser.add_argument(
@app.callback(invoke_without_command=True)
def main(
ctx: typer.Context,
input_file: Optional[str] = typer.Option(
None,
"-i",
"--input",
required=True,
help="Path to the input TXT file. This file should contain sample names in columns and microbial taxa in rows.",
)
parser.add_argument(
),
output_file: Optional[str] = typer.Option(
None,
"-o",
"--output",
required=True,
help="Path to the output CSV file. The script will restructure the data and save it here.",
)
args = parser.parse_args()
convert_to_csv(args.input, args.output)
),
) -> None:
"""Reads a TXT file, reorganizes the data, and converts it into a CSV file."""
logging.basicConfig(level=logging.INFO, format="%(message)s")

if input_file is None and output_file is None:
print(ctx.get_help())
raise typer.Exit()

if not input_file or not output_file:
print(
"Error: Missing required options '-i / --input' and '-o / --output'.",
file=sys.stderr,
)
raise typer.Exit(code=1)

try:
convert_to_csv(input_file, output_file)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(code=1)


if __name__ == "__main__":
main()
app()
59 changes: 42 additions & 17 deletions krakenparser/counts/processing_script.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
#!/usr/bin/env python

import argparse
import logging
import os
import sys
import tempfile
from pathlib import Path
from typing import Optional

import typer

_log = logging.getLogger(__name__)

app = typer.Typer(
name="process",
add_completion=False,
context_settings={"help_option_names": ["-h", "--help"]},
)


def modify_taxa_names(line):
def modify_taxa_names(line: str) -> str:
prefixes = ["s__", "g__", "f__", "o__", "c__", "p__"]
for prefix in prefixes:
if line.startswith(prefix):
Expand All @@ -19,7 +28,7 @@ def modify_taxa_names(line):
return line


def process_files(source_file, destination_file):
def process_files(source_file: str, destination_file: str) -> None:
src_path = Path(source_file)
if not src_path.is_file():
raise FileNotFoundError(f"Source file not found: {src_path}")
Expand Down Expand Up @@ -53,26 +62,42 @@ def process_files(source_file, destination_file):
_log.info(f"Processed {destination_file} successfully.")


def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(message)s")
parser = argparse.ArgumentParser(
description="Reads a source file, processes its first line, modifies taxa names in a destination file, and updates it."
)
parser.add_argument(
@app.callback(invoke_without_command=True)
def main(
ctx: typer.Context,
input_file: Optional[str] = typer.Option(
None,
"-i",
"--input",
required=True,
help="Path to the source file. This file's first line will be read and modified.",
)
parser.add_argument(
),
output_file: Optional[str] = typer.Option(
None,
"-o",
"--output",
required=True,
help="Path to the destination file. This file's contents will be updated with cleaned taxa names.",
)
args = parser.parse_args()
process_files(args.input, args.output)
),
) -> None:
"""Reads a source file, processes its first line, modifies taxa names in a destination file, and updates it."""
logging.basicConfig(level=logging.INFO, format="%(message)s")

if input_file is None and output_file is None:
print(ctx.get_help())
raise typer.Exit()

if not input_file or not output_file:
print(
"Error: Missing required options '-i / --input' and '-o / --output'.",
file=sys.stderr,
)
raise typer.Exit(code=1)

try:
process_files(input_file, output_file)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(code=1)


if __name__ == "__main__":
main()
app()
154 changes: 113 additions & 41 deletions krakenparser/counts/split_mpa.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@
Replaces decombine.sh and decombine_viruses.sh.
"""

import argparse
import logging
import re
import sys
from pathlib import Path
from typing import Optional

import typer

from krakenparser.utils import ensure_output_dir

_log = logging.getLogger(__name__)

app = typer.Typer(
name="split",
add_completion=False,
context_settings={"help_option_names": ["-h", "--help"]},
)

_RANKS = [
("species", "s__", []),
Expand All @@ -23,20 +31,21 @@
("phylum", "p__", ["s__", "g__", "f__", "o__", "c__"]),
]

_HUMAN_TAXA = {
"species": "s__Homo_sapiens",
"genus": "g__Homo",
"family": "f__Hominidae",
"order": "o__Primates",
"class": "c__Mammalia",
"phylum": "p__Chordata",
}
_HUMAN_MARKERS = frozenset(
[
"s__Homo_sapiens",
"g__Homo",
"f__Hominidae",
"o__Primates",
"c__Mammalia",
"p__Chordata",
]
)

_ACCESSION_RE = re.compile(r"(SRS|SRR|SRX|ERS|ERR|ERX|DRS|DRR|DRX)\d*-")


def _strip_path_prefix(line: str) -> str:
"""'d__X|p__Y|s__Z\t10\t20' → 's__Z\t10\t20'"""
tab = line.find("\t")
if tab == -1:
return line
Expand All @@ -46,10 +55,20 @@ def _strip_path_prefix(line: str) -> str:
return _ACCESSION_RE.sub("", segment + rest)


def _human_in_line(line: str) -> bool:
tab = line.find("\t")
path = line[:tab] if tab != -1 else line
segments = set(path.split("|"))
return bool(segments & _HUMAN_MARKERS)


def split_mpa(
input_file: str,
output_dir: str,
viruses_only: bool = False,
bacteria_only: bool = False,
fungi_only: bool = False,
archaea_only: bool = False,
keep_human: bool = False,
) -> None:
in_path = Path(input_file)
Expand All @@ -58,17 +77,28 @@ def split_mpa(
out_path = ensure_output_dir(output_dir, is_file=False)
(out_path / "txt").mkdir(exist_ok=True)

lines = in_path.read_text().splitlines()
data_lines = [ln for ln in lines if not ln.startswith("#") and ln.strip()]
all_lines = [
ln
for ln in in_path.read_text().splitlines()
if not ln.startswith("#") and ln.strip()
]

data_lines = all_lines.copy()
if viruses_only:
data_lines = [ln for ln in data_lines if "d__Viruses" in ln]
if bacteria_only:
data_lines = [ln for ln in data_lines if "d__Bacteria" in ln]
if fungi_only:
data_lines = [ln for ln in data_lines if "k__Fungi" in ln]
if archaea_only:
data_lines = [ln for ln in data_lines if "d__Archaea" in ln]

filter_human = not keep_human and not viruses_only
if keep_human:
human_lines = [ln for ln in all_lines if _human_in_line(ln)]
data_lines = list(dict.fromkeys(data_lines + human_lines))

for rank_name, rank_prefix, exclude_prefixes in _RANKS:
result = []
human_pattern = _HUMAN_TAXA[rank_name]

for line in data_lines:
if rank_prefix not in line:
Expand All @@ -77,7 +107,7 @@ def split_mpa(
continue
if any(ep in line for ep in exclude_prefixes):
continue
if filter_human and human_pattern in line:
if not keep_human and _human_in_line(line):
continue
result.append(_strip_path_prefix(line))

Expand All @@ -87,33 +117,75 @@ def split_mpa(
_log.info("MPA file split successfully. Output stored in %s", output_dir)


def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(message)s")
parser = argparse.ArgumentParser(
description="Split a combined MPA table into per-rank TXT files."
)
parser.add_argument("-i", "--input", required=True, help="Input combined MPA file")
parser.add_argument("-o", "--output", required=True, help="Output directory")
parser.add_argument(
"--viruses-only",
action="store_true",
default=False,
help="Extract only Viruses domain taxa",
)
parser.add_argument(
@app.callback(invoke_without_command=True)
def main(
ctx: typer.Context,
input_file: Optional[str] = typer.Option(
None,
"-i",
"--input",
help="Input combined MPA file.",
),
output_dir: Optional[str] = typer.Option(
None,
"-o",
"--output",
help="Output directory.",
),
viruses_only: bool = typer.Option(
False,
"--viruses",
help="Extract only VIRUSES domain taxa.",
),
bacteria_only: bool = typer.Option(
False,
"--bacteria",
help="Extract only BACTERIA domain taxa.",
),
fungi_only: bool = typer.Option(
False,
"--fungi",
help="Extract only FUNGI kingdom taxa.",
),
archaea_only: bool = typer.Option(
False,
"--archaea",
help="Extract only ARCHAEA domain taxa.",
),
keep_human: bool = typer.Option(
False,
"--keep-human",
action="store_true",
default=False,
help="Do not filter human-related taxa (default: filtered)",
)
args = parser.parse_args()
split_mpa(
args.input,
args.output,
viruses_only=args.viruses_only,
keep_human=args.keep_human,
)
help="Retain human-related taxa (default: filtered out).",
),
) -> None:
"""Split a combined MPA table into per-rank TXT files."""
logging.basicConfig(level=logging.INFO, format="%(message)s")

if input_file is None and output_dir is None:
print(ctx.get_help())
raise typer.Exit()

if not input_file or not output_dir:
print(
"Error: Missing required options '-i / --input' and '-o / --output'.",
file=sys.stderr,
)
raise typer.Exit(code=1)

try:
split_mpa(
input_file,
output_dir,
viruses_only=viruses_only,
bacteria_only=bacteria_only,
fungi_only=fungi_only,
archaea_only=archaea_only,
keep_human=keep_human,
)
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
raise typer.Exit(code=1)


if __name__ == "__main__":
main()
app()
Loading
Loading