|
| 1 | + |
| 2 | +# hmmscan --cpu 32 --tblout "${BUG}_genes_COG.tbl" "$COG_DB" "${BUG}_translated_gene_seqs.fasta" |
| 3 | +write_compressed_parquet <- function(df, path) { |
| 4 | + arrow::write_parquet( |
| 5 | + df, |
| 6 | + path, |
| 7 | + compression = "zstd", |
| 8 | + compression_level = 9, |
| 9 | + use_dictionary = TRUE |
| 10 | + ) |
| 11 | + } |
| 12 | + |
| 13 | +.runHMMER <- function(duckdb_path, |
| 14 | + output_path, |
| 15 | + threads = 0, |
| 16 | + database_path, |
| 17 | + split_jobs = TRUE, |
| 18 | + num_of_splits = 20 |
| 19 | + ) { |
| 20 | + |
| 21 | + # Fail fast if Docker is missing |
| 22 | +# if (!nzchar(Sys.which("docker"))) { |
| 23 | +# stop("Docker is not available on your PATH but is required to run CD-HIT.") |
| 24 | +# } |
| 25 | + |
| 26 | + duckdb_path <- .docker_path(duckdb_path) |
| 27 | + if (missing(output_path) || output_path %in% c(".", "results", "results/")) { |
| 28 | + output_path <- dirname(duckdb_path) |
| 29 | + } |
| 30 | + output_path <- .docker_path(output_path) |
| 31 | + if (!dir.exists(output_path)) dir.create(output_path, recursive = TRUE) |
| 32 | + |
| 33 | + con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) |
| 34 | + on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) |
| 35 | + |
| 36 | + prot_seqs <- DBI::dbReadTable(con, "protein_cluster_seq") |> |
| 37 | + tibble::as_tibble() |
| 38 | + |
| 39 | + # robust database name |
| 40 | +database <- tools::file_path_sans_ext(basename(database_path)) |
| 41 | + |
| 42 | + # Chunking function |
| 43 | + chunk_count <- num_of_splits |
| 44 | + |
| 45 | + split_fasta <- function(seqs, prefix) { |
| 46 | + records <- paste0("> ", seqs$name, "\n", seqs$sequence) |
| 47 | + chunk_size <- ceiling(length(records) / chunk_count) |
| 48 | + chunks <- split(records, ceiling(seq_along(records) / chunk_size)) |
| 49 | + |
| 50 | + purrr::walk2(chunks, seq_along(chunks), function(chunk, i) { |
| 51 | + chunk_path <- file.path(output_path, sprintf("%s_chunk_%02d.fasta", prefix, i)) |
| 52 | + readr::write_lines(chunk, chunk_path) |
| 53 | + }) |
| 54 | + } |
| 55 | + |
| 56 | + split_fasta(prot_seqs, "protein") |
| 57 | + |
| 58 | + # readr::write_lines(paste0("> ", prot_seqs$name, "\n", prot_seqs$sequence), |
| 59 | + # file.path(output_path, "proteins_for_hmmer.fasta")) |
| 60 | + |
| 61 | + # Generate job list for ARG and COG |
| 62 | + job_list <- expand.grid( |
| 63 | + chunk = sprintf("%02d", 1:chunk_count), |
| 64 | + db = database, |
| 65 | + stringsAsFactors = FALSE |
| 66 | + ) |> |
| 67 | + dplyr::mutate( |
| 68 | + JOB_NAME = paste0("protein_chunk_", chunk, "_", db), |
| 69 | + FASTA = paste0("protein_chunk_", chunk, ".fasta"), |
| 70 | + DB = db |
| 71 | + ) |> |
| 72 | + dplyr::select(JOB_NAME, FASTA, DB) |
| 73 | + |
| 74 | + # readr::write_tsv(job_list, file.path(output_path, paste0("hmmer_jobs_",database,".txt"))) |
| 75 | + |
| 76 | +# number of parallel jobs (NOT threads per hmmscan) |
| 77 | +n_workers <- 4 |
| 78 | + |
| 79 | +# threads per hmmscan |
| 80 | +threads <- 8 |
| 81 | + |
| 82 | +future::plan(future::multisession, workers = n_workers) |
| 83 | + |
| 84 | +.runHmmerJob <- function(JOB_NAME, FASTA, DB) { |
| 85 | + |
| 86 | + hmmer_input <- file.path(output_path, FASTA) |
| 87 | + hmmer_output <- file.path(output_path, paste0(JOB_NAME, ".tbl")) |
| 88 | + |
| 89 | +# database paths |
| 90 | +db_host_dir <- dirname(database_path) |
| 91 | +db_filename <- basename(database_path) |
| 92 | +db_cont_dir <- "/opt/hmmer/data" |
| 93 | +db_cont_path <- file.path(db_cont_dir, db_filename) |
| 94 | + |
| 95 | +# mounts |
| 96 | + mount_host <- output_path |
| 97 | + mount_cont <- "/work" |
| 98 | + |
| 99 | +cmd_args <- c( |
| 100 | + "exec", |
| 101 | + "-B", paste0(mount_host, ":", mount_cont), |
| 102 | + "-B", paste0(db_host_dir, ":", db_cont_dir), |
| 103 | + "/scratch/alpine/aghosh5@xsede.org/software/hmmer_latest.sif", |
| 104 | + "hmmscan", |
| 105 | + "--cpu", as.character(threads), |
| 106 | + "--tblout", .to_container(hmmer_output, mount_host, mount_cont), |
| 107 | + db_cont_path, |
| 108 | + .to_container(hmmer_input, mount_host, mount_cont) |
| 109 | +) |
| 110 | + |
| 111 | + message("Running hmmer via Docker...") |
| 112 | + output <- tryCatch({ |
| 113 | + system2("apptainer", args = cmd_args, stdout = TRUE, stderr = TRUE) |
| 114 | + }, error = function(e) { |
| 115 | + stop("hmmer execution failed: ", e$message) |
| 116 | + }) |
| 117 | + |
| 118 | + if (!file.exists(hmmer_output)) { |
| 119 | + stop("hmmer failed: output file not found. Check stderr:\n", paste(output, collapse = "\n")) |
| 120 | + } |
| 121 | + |
| 122 | + message("hmmer completed successfully.") |
| 123 | + |
| 124 | +hmmer_tbl <- .parse_hmmer_output(hmmer_output) |
| 125 | + |
| 126 | +hmmer_tbl <- hmmer_tbl |> |
| 127 | +dplyr::select("name", "query_name", "description") |
| 128 | + |
| 129 | +hmmer_tbl_filename <- file.path(dirname(hmmer_output), paste0(tools::file_path_sans_ext(basename(hmmer_output)), ".parquet")) |
| 130 | + |
| 131 | +hmmer_tbl |> |
| 132 | +write_compressed_parquet(hmmer_tbl_filename) |
| 133 | + |
| 134 | +return(hmmer_tbl_filename) |
| 135 | +} |
| 136 | + |
| 137 | +results <- furrr::future_pwalk( |
| 138 | + job_list, |
| 139 | + .runHmmerJob, |
| 140 | + .progress = TRUE |
| 141 | +) |
| 142 | +parquet_files <- results |> dplyr::mutate(parquet = paste0(JOB_NAME, ".parquet")) |> dplyr::pull(parquet) |
| 143 | + |
| 144 | +final_parquet <- file.path(output_path, paste0("protein_", database,".parquet")) |
| 145 | + |
| 146 | +dataset <- arrow::open_dataset( |
| 147 | + file.path(output_path, parquet_files), |
| 148 | + format = "parquet" |
| 149 | +) |
| 150 | + |
| 151 | +arrow::write_parquet( |
| 152 | + dataset, |
| 153 | + file.path(output_path, paste0("protein_", database, ".parquet")) |
| 154 | +) |
| 155 | + |
| 156 | +message("Combined parquet written") |
| 157 | + |
| 158 | + # arrow::read_parquet("/scratch/alpine/aghosh5@xsede.org/AMR/data/Campylobacter_jejuni/protein_COG_count.parquet") |> DBI::dbWriteTable(conn=con, name="protein_COG_count") |
| 159 | + |
| 160 | +arrow::read_parquet(final_parquet) |> |
| 161 | + DBI::dbWriteTable(conn = con, name=tools::file_path_sans_ext(basename(final_parquet)), overwrite = TRUE) |
| 162 | + |
| 163 | +} |
| 164 | + |
| 165 | + |
| 166 | +#' Read a file created as HMMER output |
| 167 | +#' modified the rhmmer |
| 168 | +#' @param file Filename |
| 169 | +#' @return data.frame |
| 170 | +#' @export |
| 171 | +#' |
| 172 | +.parse_hmmer_output <- function(file){ |
| 173 | + |
| 174 | + col_types <- readr::cols( |
| 175 | + name = readr::col_character(), |
| 176 | + accession = readr::col_character(), |
| 177 | + query_name = readr::col_character(), |
| 178 | + query_accession = readr::col_character(), |
| 179 | + sequence_evalue = readr::col_double(), |
| 180 | + sequence_score = readr::col_double(), |
| 181 | + sequence_bias = readr::col_double(), |
| 182 | + best_evalue = readr::col_double(), |
| 183 | + best_score = readr::col_double(), |
| 184 | + best_bis = readr::col_double(), |
| 185 | + number_exp = readr::col_double(), |
| 186 | + number_reg = readr::col_integer(), |
| 187 | + number_clu = readr::col_integer(), |
| 188 | + number_ov = readr::col_integer(), |
| 189 | + number_env = readr::col_integer(), |
| 190 | + number_dom = readr::col_integer(), |
| 191 | + number_rep = readr::col_integer(), |
| 192 | + number_inc = readr::col_character(), |
| 193 | + description = readr::col_character() |
| 194 | + ) |
| 195 | + |
| 196 | + # drop comment lines |
| 197 | +data_lines <- lines[!grepl("^#", lines)] |
| 198 | + |
| 199 | +# split: whitespace-separated fields |
| 200 | +split_fields <- strsplit(data_lines, "\\s+", perl = TRUE) |
| 201 | + |
| 202 | +# count space separated fields |
| 203 | + N <- max(sapply(split_fields, length)) |
| 204 | + |
| 205 | + # the line delimiter should always be just "\n", even on Windows |
| 206 | + lines <- readr::read_lines(file, lazy=FALSE, progress=FALSE) |
| 207 | + |
| 208 | + table <- sub( |
| 209 | + pattern = sprintf("(%s).*", paste0(rep('\\S+', N), collapse=" +")), |
| 210 | + replacement = '\\1', |
| 211 | + x=lines, |
| 212 | + perl = TRUE |
| 213 | + ) |> |
| 214 | + gsub(pattern=" *", replacement="\t") |> |
| 215 | + paste0(collapse="\n") |> |
| 216 | + readr::read_tsv( |
| 217 | + col_names=names(col_types$cols), |
| 218 | + comment='#', |
| 219 | + na='-', |
| 220 | + col_types = col_types, |
| 221 | + lazy=FALSE, |
| 222 | + progress=FALSE |
| 223 | + ) |> |
| 224 | + tidyr::unite(description, description:last_col(), sep = " ") |
| 225 | +table$description <- gsub("\t", " ", table$description) |
| 226 | + |
| 227 | + table |
| 228 | +} |
| 229 | + |
| 230 | +proteinAnnotations2Duckdb <- function(annotated_parquet, duckdb_path) { |
| 231 | + |
| 232 | + annotated_parquet <- .docker_path(annotated_parquet) |
| 233 | + |
| 234 | + # robust database name |
| 235 | +database <- tools::file_path_sans_ext(basename(annotated_parquet)) |
| 236 | + |
| 237 | + duckdb_path <- .docker_path(duckdb_path) |
| 238 | + |
| 239 | + con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) |
| 240 | + on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) |
| 241 | + |
| 242 | + protein_count <- DBI::dbReadTable(con, "protein_count") |> |
| 243 | + tibble::as_tibble() |
| 244 | + |
| 245 | +protein_long <- protein_count |> |
| 246 | + tibble::as_tibble() |> |
| 247 | + tidyr::pivot_longer( |
| 248 | + cols = -genome_id, |
| 249 | + names_to = "query_name", |
| 250 | + values_to = "count" |
| 251 | + ) |> |
| 252 | + dplyr::filter(count > 0) |
| 253 | + |
| 254 | + # protein_COG <- arrow::read_parquet("data/Campylobacter_jejuni/protein_COG.parquet") |
| 255 | + Annotation <- arrow::read_parquet(annotated_parquet) |
| 256 | + |
| 257 | +protein_long_annot <- protein_long |> |
| 258 | + dplyr::mutate(query_name = stringr::str_replace(query_name, "^fig\\.", "fig|")) |> |
| 259 | + dplyr::inner_join( |
| 260 | + Annotation |> |
| 261 | + dplyr::select(name, query_name), |
| 262 | + by = "query_name" |
| 263 | + ) |
| 264 | +genome_annot_counts <- protein_long_annot |> |
| 265 | + dplyr::group_by(genome_id, name) |> |
| 266 | + dplyr::summarise(count = sum(count), .groups = "drop") |
| 267 | + |
| 268 | + genome_annot_matrix <- genome_annot_counts |> |
| 269 | + tidyr::pivot_wider( |
| 270 | + names_from = name, |
| 271 | + values_from = count, |
| 272 | + values_fill = 0 |
| 273 | + ) |
| 274 | + |
| 275 | + arrow::write_parquet( |
| 276 | + genome_annot_matrix, |
| 277 | + file.path(dirname(duckdb_path), paste0(database,"_count", ".parquet")) |
| 278 | +) |
| 279 | + |
| 280 | + count_path <- file.path(dirname(duckdb_path), paste0(database,"_count", ".parquet")) |
| 281 | + |
| 282 | + arrow::read_parquet(count_path) |> |
| 283 | + DBI::dbWriteTable(conn = con, name=tools::file_path_sans_ext(basename(count_path)), overwrite = TRUE) |
| 284 | + |
| 285 | +} |
0 commit comments