diff --git a/DESCRIPTION b/DESCRIPTION index 83fdc00..8a6629b 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -15,8 +15,9 @@ Description: MSstats package provide tools for preprocessing, summarization and License: Artistic-2.0 Encoding: UTF-8 RoxygenNote: 7.3.3 -Imports: +Imports: arrow, + data.table, DBI, dplyr, MSstats, diff --git a/NAMESPACE b/NAMESPACE index 9ec4823..8312349 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -10,5 +10,9 @@ importFrom(MSstats,groupComparison) importFrom(MSstatsConvert,MSstatsClean) importFrom(MSstatsConvert,MSstatsImport) importFrom(MSstatsConvert,MSstatsMakeAnnotation) +importFrom(data.table,":=") +importFrom(data.table,.SD) +importFrom(data.table,setDT) +importFrom(data.table,setnames) importFrom(utils,head) importFrom(utils,sessionInfo) diff --git a/R/clean_spectronaut.R b/R/clean_spectronaut.R index 1eeb89c..198f4d1 100644 --- a/R/clean_spectronaut.R +++ b/R/clean_spectronaut.R @@ -1,3 +1,6 @@ +#' @importFrom data.table := .SD setDT setnames +NULL + #' @keywords internal reduceBigSpectronaut <- function(input_file, output_path, intensity="F.NormalizedPeakArea", @@ -5,29 +8,99 @@ reduceBigSpectronaut <- function(input_file, output_path, filter_by_identified = FALSE, filter_by_qvalue = TRUE, qvalue_cutoff = 0.01, - calculateAnomalyScores=FALSE, - anomalyModelFeatures=c()) { + calculateAnomalyScores=FALSE, + anomalyModelFeatures=c(), + block_size = 16L * 1024L * 1024L) { + block_size <- as.integer(block_size) + stopifnot(length(block_size) == 1L, !is.na(block_size), block_size > 0L) + if (grepl("csv", input_file)) { - delim = "," + delim <- "," } else if (grepl("tsv|xls", input_file)) { - delim = "\t" + delim <- "\t" } else { delim <- ";" } - spec_chunk <- function(x, pos) cleanSpectronautChunk(x, - output_path, - intensity, - filter_by_excluded, - filter_by_identified, - filter_by_qvalue, - qvalue_cutoff, - pos, - calculateAnomalyScores, - anomalyModelFeatures) - readr::read_delim_chunked(input_file, - readr::DataFrameCallback$new(spec_chunk), - delim = delim, - chunk_size = 1e6) + + # Columns cleanSpectronautChunk actually consumes; Arrow's + # convert_options$include_columns drops everything else at parse time so + # we never materialize the ~35 unused columns Spectronaut exports. + needed_cols <- c("R.FileName", "R.Condition", "R.Replicate", + "PG.ProteinAccessions", "EG.ModifiedSequence", + "FG.LabeledSequence", "FG.Charge", + "F.FrgIon", "F.Charge", + "EG.Identified", "F.ExcludedFromQuantification", + "F.FrgLossType", "PG.Qvalue", "EG.Qvalue", + intensity) + if (calculateAnomalyScores) { + needed_cols <- c(needed_cols, anomalyModelFeatures) + } + + # Arrow's CSV reader replaces readr::read_delim_chunked. Arrow releases + # per-batch state as soon as a batch is consumed, so peak memory is + # bounded by one record batch instead of growing with the dataset (readr + # keeps a string-interning pool that accumulates across chunks). The + # `delim` switch above already covers comma / tab / semicolon variants; + # Arrow's CSV reader handles all three the same way through + # CsvParseOptions$delimiter. + parse_opts <- arrow::CsvParseOptions$create(delimiter = delim) + convert_opts <- arrow::CsvConvertOptions$create() + read_opts <- arrow::CsvReadOptions$create(block_size = block_size) + + ds <- arrow::open_dataset( + input_file, + format = "csv", + parse_options = parse_opts, + convert_options = convert_opts, + read_options = read_opts + ) + + reader <- arrow::Scanner$create(ds)$ToRecordBatchReader() + + t_start <- Sys.time() + pos <- 1L + batch_idx <- 0L + repeat { + batch <- reader$read_next_batch() + if (is.null(batch)) break + chunk_df <- as.data.frame(batch) + cleanSpectronautChunk(chunk_df, + output_path, + intensity, + filter_by_excluded, + filter_by_identified, + filter_by_qvalue, + qvalue_cutoff, + pos, + calculateAnomalyScores, + anomalyModelFeatures) + pos <- pos + nrow(chunk_df) + batch_idx <- batch_idx + 1L + + if (batch_idx %% 1000L == 0L) { + elapsed <- as.numeric(Sys.time() - t_start, units = "secs") + rate <- (pos - 1L) / elapsed + message(sprintf( + "[reduceBigSpectronaut] %d batches | %s rows | %.1fk rows/s | %.0fs elapsed", + batch_idx, + format(pos - 1L, big.mark = ","), + rate / 1000, + elapsed)) + } + + rm(batch, chunk_df) + } + + if (batch_idx %% 1000L != 0L) { + elapsed <- as.numeric(Sys.time() - t_start, units = "secs") + rate <- (pos - 1L) / elapsed + message(sprintf( + "[reduceBigSpectronaut] done: %d batches | %s rows | %.1fk rows/s | %.0fs elapsed", + batch_idx, + format(pos - 1L, big.mark = ","), + rate / 1000, + elapsed)) + } } #' @keywords internal @@ -38,85 +111,88 @@ cleanSpectronautChunk = function(input, output_path, filter_by_qvalue = TRUE, qvalue_cutoff = 0.01, pos = NULL, - calculateAnomalyScores=FALSE, + calculateAnomalyScores=FALSE, anomalyModelFeatures=c()) { + data.table::setDT(input) + all_cols <- c("R.FileName", "R.Condition", "R.Replicate", "PG.ProteinAccessions", "EG.ModifiedSequence", "FG.LabeledSequence", "FG.Charge", "F.FrgIon", "F.Charge", "EG.Identified", "F.ExcludedFromQuantification", "F.FrgLossType", "PG.Qvalue", "EG.Qvalue", intensity) - - if (calculateAnomalyScores){ - all_cols <- c(all_cols, anomalyModelFeatures) - } - - cols <- intersect(all_cols, colnames(input)) - input <- dplyr::select(input, all_of(cols)) - input <- dplyr::rename_with(input, .fn = MSstatsConvert:::.standardizeColnames) - new_names <- c("Run", "Condition", "BioReplicate", "ProteinName", "PeptideSequence", "LabeledSequence", "PrecursorCharge", "FragmentIon", "ProductCharge", "Identified", "Excluded", "FFrgLossType", "PGQvalue", "EGQvalue", "Intensity") - if (calculateAnomalyScores){ + if (calculateAnomalyScores) { + all_cols <- c(all_cols, anomalyModelFeatures) new_names <- c(new_names, MSstatsConvert:::.standardizeColnames(anomalyModelFeatures)) } - - # non_standardized = - old_names <- MSstatsConvert:::.standardizeColnames(all_cols) - names(old_names) <- new_names - old_names <- old_names[old_names %in% colnames(input)] - - input <- dplyr::rename(input, !!old_names) - input <- dplyr::mutate(input, Intensity = as.numeric(Intensity)) - - if (is.character(dplyr::pull(dplyr::collect(head(dplyr::select(input, Excluded))), Excluded))) { - input <- dplyr::mutate(input, Excluded = Excluded == "True") + + present_orig <- intersect(all_cols, colnames(input)) + if (length(present_orig) == 0L) { + stop(sprintf( + paste0("cleanSpectronautChunk: none of the expected Spectronaut ", + "columns were found in the input batch. ", + "Expected any of: %s. Found: %s. ", + "Check that the file is comma-delimited and that the ", + "Spectronaut export uses the standard column names."), + paste(all_cols, collapse = ", "), + paste(colnames(input), collapse = ", "))) } - if (is.element("Identified", colnames(input))) { - if (is.character(dplyr::pull(dplyr::collect(head(dplyr::select(input, Identified))), Identified))) { - input <- dplyr::mutate(input, Identified = Identified == "True") - } + input <- input[, present_orig, with = FALSE] + + # Two-step rename matching the MSstatsConvert family pattern: standardize + # all column names, then map standardized -> MSstats final names. + data.table::setnames(input, MSstatsConvert:::.standardizeColnames(colnames(input))) + std_to_msstats <- stats::setNames(new_names, + MSstatsConvert:::.standardizeColnames(all_cols)) + data.table::setnames(input, + old = names(std_to_msstats), + new = unname(std_to_msstats), + skip_absent = TRUE) + + input[, Intensity := as.numeric(Intensity)] + + if (is.character(input[["Excluded"]])) { + input[, Excluded := Excluded == "True"] + } + if ("Identified" %in% colnames(input) && is.character(input[["Identified"]])) { + input[, Identified := Identified == "True"] } - + if (filter_by_excluded) { - input <- dplyr::mutate( - input, Intensity = dplyr::if_else(Excluded, NA_real_, Intensity)) - + input[Excluded == TRUE, Intensity := NA_real_] } - if (filter_by_identified) { - input <- dplyr::mutate( - input, Intensity = dplyr::if_else(Identified, Intensity, NA_real_)) + input[Identified == FALSE, Intensity := NA_real_] } - if (filter_by_qvalue) { - input <- dplyr::mutate( - input, - Intensity = dplyr::if_else(EGQvalue < qvalue_cutoff, Intensity, NA_real_)) - input <- dplyr::mutate( - input, - Intensity = dplyr::if_else(PGQvalue < qvalue_cutoff, Intensity, NA_real_)) + # Preserve dplyr::if_else semantics: rows with NA q-values become NA. + input[is.na(EGQvalue) | EGQvalue >= qvalue_cutoff, Intensity := NA_real_] + input[is.na(PGQvalue) | PGQvalue >= qvalue_cutoff, Intensity := NA_real_] } - - input <- dplyr::filter(input, FFrgLossType == "noloss") - if (is.element("LabeledSequence", colnames(input))) { - input <- dplyr::mutate(input, IsLabeled = grepl("Lys8", LabeledSequence) | grepl("Arg10", LabeledSequence)) - input <- dplyr::mutate(input, IsotopeLabelType := dplyr::if_else(IsLabeled, "H", "L")) + + input <- input[FFrgLossType == "noloss"] + + if ("LabeledSequence" %in% colnames(input)) { + input[, IsotopeLabelType := ifelse( + grepl("Lys8", LabeledSequence) | grepl("Arg10", LabeledSequence), + "H", "L")] } else { - input <- dplyr::mutate(input, IsotopeLabelType = "L") + input[, IsotopeLabelType := "L"] } - - select_cols = c("ProteinName", "PeptideSequence", "PrecursorCharge", "FragmentIon", - "ProductCharge", "IsotopeLabelType", "Run", "BioReplicate", "Condition", - "Intensity") - if (calculateAnomalyScores){ - select_cols = c(select_cols, - MSstatsConvert:::.standardizeColnames(anomalyModelFeatures)) + + select_cols <- c("ProteinName", "PeptideSequence", "PrecursorCharge", "FragmentIon", + "ProductCharge", "IsotopeLabelType", "Run", "BioReplicate", "Condition", + "Intensity") + if (calculateAnomalyScores) { + select_cols <- c(select_cols, + MSstatsConvert:::.standardizeColnames(anomalyModelFeatures)) } - - input <- dplyr::select(input, select_cols) + + input <- input[, select_cols, with = FALSE] .writeChunkToFile(input, output_path, pos) NULL } diff --git a/R/converters.R b/R/converters.R index 13b4383..2b406ac 100644 --- a/R/converters.R +++ b/R/converters.R @@ -117,6 +117,11 @@ bigFragPipetoMSstatsFormat <- function(input_file, output_file_name, #' @param filter_by_identified if TRUE, will filter by the `EG.Identified` column. #' @param filter_by_qvalue if TRUE, will filter by EG.Qvalue and PG.Qvalue columns. #' @param qvalue_cutoff cutoff which will be used for q-value filtering. +#' @param block_size Arrow CSV reader block size in bytes; each input row must +#' fit inside one block. Defaults to 16 MiB (`16L * 1024L * 1024L`). If you +#' see `Invalid: straddling object straddles two block boundaries` on +#' extra-wide Spectronaut exports, pass a larger value +#' (e.g. `64L * 1024L * 1024L`). #' #' @export #' @@ -143,14 +148,16 @@ bigSpectronauttoMSstatsFormat <- function(input_file, output_file_name, aggregate_psms = FALSE, filter_few_obs = FALSE, remove_annotation = FALSE, - calculateAnomalyScores=FALSE, + calculateAnomalyScores=FALSE, anomalyModelFeatures=c(), - connection = NULL) { + connection = NULL, + block_size = 16L * 1024L * 1024L) { reduced_file <- .prefixedPath("reduce_output_", output_file_name) reduceBigSpectronaut(input_file, reduced_file, intensity, filter_by_excluded, filter_by_identified, filter_by_qvalue, qvalue_cutoff, - calculateAnomalyScores, anomalyModelFeatures) + calculateAnomalyScores, anomalyModelFeatures, + block_size = block_size) msstats_data <- MSstatsPreprocessBig( input_file = reduced_file, output_file_name = output_file_name, diff --git a/man/bigSpectronauttoMSstatsFormat.Rd b/man/bigSpectronauttoMSstatsFormat.Rd index 01706ef..b3e8f0c 100644 --- a/man/bigSpectronauttoMSstatsFormat.Rd +++ b/man/bigSpectronauttoMSstatsFormat.Rd @@ -20,7 +20,8 @@ bigSpectronauttoMSstatsFormat( remove_annotation = FALSE, calculateAnomalyScores = FALSE, anomalyModelFeatures = c(), - connection = NULL + connection = NULL, + block_size = 16L * 1024L * 1024L ) } \arguments{ @@ -63,6 +64,12 @@ using dataProcess function. Only applicable to sparklyr backend.} \item{connection}{Connection to a spark instance created with the `spark_connect` function from `sparklyr` package.} + +\item{block_size}{Arrow CSV reader block size in bytes; each input row must +fit inside one block. Defaults to 16 MiB (`16L * 1024L * 1024L`). If you +see `Invalid: straddling object straddles two block boundaries` on +extra-wide Spectronaut exports, pass a larger value +(e.g. `64L * 1024L * 1024L`).} } \value{ either arrow object or sparklyr table that can be optionally collected diff --git a/man/dot-prefixedPath.Rd b/man/dot-prefixedPath.Rd new file mode 100644 index 0000000..be036ac --- /dev/null +++ b/man/dot-prefixedPath.Rd @@ -0,0 +1,24 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utils.R +\name{.prefixedPath} +\alias{.prefixedPath} +\title{Build an intermediate output path by prefixing only the basename.} +\usage{ +.prefixedPath(prefix, path) +} +\arguments{ +\item{prefix}{Character scalar prepended to the basename.} + +\item{path}{Output file path supplied by the caller.} +} +\value{ +Character scalar. +} +\description{ +Naive `paste0(prefix, output_file_name)` corrupts paths that contain a +directory (`subdir/out.csv` → `topN_subdir/out.csv`, +`/tmp/out.csv` → `topN_/tmp/out.csv`). Splitting via dirname/basename keeps +the directory component intact so intermediate files land beside the final +output. +} +\keyword{internal} diff --git a/tests/testthat/test-converters.R b/tests/testthat/test-converters.R index 78f6da3..78e0c1b 100644 --- a/tests/testthat/test-converters.R +++ b/tests/testthat/test-converters.R @@ -94,6 +94,193 @@ test_that("bigSpectronauttoMSstatsFormat works correctly", { unlink(paste0("reduce_output_", output_file), recursive = TRUE, force = TRUE) }) +make_spectronaut_input <- function(n = 1L, ...) { + base <- data.frame( + R.FileName = rep("run1", n), + R.Condition = "A", + R.Replicate = 1L, + PG.ProteinAccessions = "P1", + EG.ModifiedSequence = paste0("PEP", seq_len(n)), + FG.LabeledSequence = paste0("PEP", seq_len(n)), + FG.Charge = 2L, + F.FrgIon = "y1", + F.Charge = 1L, + EG.Identified = "True", + F.ExcludedFromQuantification = "False", + F.FrgLossType = "noloss", + PG.Qvalue = 0.001, + EG.Qvalue = 0.001, + F.NormalizedPeakArea = seq_len(n) * 100, + stringsAsFactors = FALSE + ) + overrides <- list(...) + for (col in names(overrides)) base[[col]] <- overrides[[col]] + base +} + +test_that("cleanSpectronautChunk produces the expected MSstats schema", { + input <- make_spectronaut_input(n = 1L) + output_file <- tempfile(fileext = ".csv") + on.exit(unlink(output_file, force = TRUE), add = TRUE) + + MSstatsBig:::cleanSpectronautChunk(input, output_file, pos = 1L) + result <- readr::read_csv(output_file, show_col_types = FALSE) + + expected_cols <- c("ProteinName", "PeptideSequence", "PrecursorCharge", "FragmentIon", + "ProductCharge", "IsotopeLabelType", "Run", "BioReplicate", "Condition", + "Intensity") + expect_setequal(colnames(result), expected_cols) + expect_equal(nrow(result), 1L) + expect_equal(result$Intensity, 100) + expect_equal(result$IsotopeLabelType, "L") +}) + +test_that("cleanSpectronautChunk filter_by_excluded sets Intensity to NA on excluded rows", { + input <- make_spectronaut_input( + n = 2L, + F.ExcludedFromQuantification = c("True", "False"), + F.NormalizedPeakArea = c(100, 200) + ) + output_file <- tempfile(fileext = ".csv") + on.exit(unlink(output_file, force = TRUE), add = TRUE) + + MSstatsBig:::cleanSpectronautChunk(input, output_file, pos = 1L, + filter_by_excluded = TRUE, + filter_by_qvalue = FALSE) + result <- readr::read_csv(output_file, show_col_types = FALSE) + result <- result[order(result$PeptideSequence), ] + + expect_true(is.na(result$Intensity[1])) + expect_equal(result$Intensity[2], 200) +}) + +test_that("cleanSpectronautChunk filter_by_identified sets Intensity to NA on unidentified rows", { + input <- make_spectronaut_input( + n = 2L, + EG.Identified = c("True", "False"), + F.NormalizedPeakArea = c(100, 200) + ) + output_file <- tempfile(fileext = ".csv") + on.exit(unlink(output_file, force = TRUE), add = TRUE) + + MSstatsBig:::cleanSpectronautChunk(input, output_file, pos = 1L, + filter_by_identified = TRUE, + filter_by_qvalue = FALSE) + result <- readr::read_csv(output_file, show_col_types = FALSE) + result <- result[order(result$PeptideSequence), ] + + expect_equal(result$Intensity[1], 100) + expect_true(is.na(result$Intensity[2])) +}) + +test_that("cleanSpectronautChunk filter_by_qvalue NA-aware semantics match dplyr::if_else", { + input <- make_spectronaut_input( + n = 3L, + EG.Qvalue = c(0.001, 0.5, NA_real_), + F.NormalizedPeakArea = c(100, 200, 300) + ) + output_file <- tempfile(fileext = ".csv") + on.exit(unlink(output_file, force = TRUE), add = TRUE) + + MSstatsBig:::cleanSpectronautChunk(input, output_file, pos = 1L, + filter_by_qvalue = TRUE, + qvalue_cutoff = 0.01) + result <- readr::read_csv(output_file, show_col_types = FALSE) + result <- result[order(result$PeptideSequence), ] + + # PEP1: EGQvalue below cutoff -> kept + expect_equal(result$Intensity[1], 100) + # PEP2: EGQvalue above cutoff -> NA + expect_true(is.na(result$Intensity[2])) + # PEP3: EGQvalue NA -> NA (preserves dplyr::if_else semantics) + expect_true(is.na(result$Intensity[3])) +}) + +test_that("cleanSpectronautChunk drops rows where FFrgLossType != noloss", { + input <- make_spectronaut_input( + n = 3L, + F.FrgLossType = c("noloss", "H2O", "noloss") + ) + output_file <- tempfile(fileext = ".csv") + on.exit(unlink(output_file, force = TRUE), add = TRUE) + + MSstatsBig:::cleanSpectronautChunk(input, output_file, pos = 1L, + filter_by_qvalue = FALSE) + result <- readr::read_csv(output_file, show_col_types = FALSE) + + expect_equal(nrow(result), 2L) + expect_setequal(result$PeptideSequence, c("PEP1", "PEP3")) +}) + +test_that("reduceBigSpectronaut rejects invalid block_size values", { + input_file <- tempfile(fileext = ".csv") + writeLines("a,b\n1,2", input_file) + output_file <- tempfile() + on.exit({ + unlink(input_file, force = TRUE) + unlink(output_file, recursive = TRUE, force = TRUE) + }, add = TRUE) + + expect_error(reduceBigSpectronaut(input_file, output_file, block_size = -1L)) + expect_error(reduceBigSpectronaut(input_file, output_file, block_size = 0L)) + expect_error(reduceBigSpectronaut(input_file, output_file, block_size = NA_integer_)) + expect_error(reduceBigSpectronaut(input_file, output_file, block_size = c(1L, 2L))) + expect_error(suppressWarnings( + reduceBigSpectronaut(input_file, output_file, block_size = "16MB") + )) +}) + +test_that("bigSpectronauttoMSstatsFormat plumbs block_size through to reduceBigSpectronaut", { + captured <- new.env(parent = emptyenv()) + captured$block_size <- NULL + + spy_reduce <- function(input_file, output_path, intensity, filter_by_excluded, + filter_by_identified, filter_by_qvalue, qvalue_cutoff, + calculateAnomalyScores, anomalyModelFeatures, + block_size = 16L * 1024L * 1024L) { + captured$block_size <- block_size + msstats_data <- data.frame( + ProteinName = "P1", PeptideSequence = "PEPTIDE", PrecursorCharge = 2, + FragmentIon = "frag1", ProductCharge = 1, + IsotopeLabelType = "L", Condition = "A", BioReplicate = 1, + Run = "run1", Intensity = 100 + ) + readr::write_csv(msstats_data, output_path) + } + + input_file <- "dummy_spectro_input.csv" + + # Default forwards 16 MiB. + stub(bigSpectronauttoMSstatsFormat, "reduceBigSpectronaut", spy_reduce) + output_file_default <- tempfile(fileext = ".csv") + on.exit({ + unlink(output_file_default, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", basename(output_file_default)), + recursive = TRUE, force = TRUE) + }, add = TRUE) + bigSpectronauttoMSstatsFormat( + input_file = input_file, + output_file_name = output_file_default, + backend = "arrow" + ) + expect_identical(captured$block_size, 16L * 1024L * 1024L) + + # Override forwards the user's value. + output_file_override <- tempfile(fileext = ".csv") + on.exit({ + unlink(output_file_override, recursive = TRUE, force = TRUE) + unlink(paste0("reduce_output_", basename(output_file_override)), + recursive = TRUE, force = TRUE) + }, add = TRUE) + bigSpectronauttoMSstatsFormat( + input_file = input_file, + output_file_name = output_file_override, + backend = "arrow", + block_size = 8L * 1024L * 1024L + ) + expect_identical(captured$block_size, 8L * 1024L * 1024L) +}) + # test_that("bigDIANNtoMSstatsFormat works with real MSstatsConvert tinytest data", { # input_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/diann_input.tsv" # annotation_file <- "/Users/rudhikshah/NorthEasternContractWork/MSstatsConvert/inst/tinytest/raw_data/DIANN/annotation.csv"