Skip to content

Commit 36af16e

Browse files
amrfinder: adding parallelism to the conversion of coordinates. Currently parallel by chromosome
1 parent 2abe87c commit 36af16e

1 file changed

Lines changed: 47 additions & 18 deletions

File tree

src/amrfinder/amrfinder.cpp

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* amrfinder: program for resolving epialleles in a sliding window
22
* along a chromosome.
33
*
4-
* Copyright (C) 2014-2022 University of Southern California and
4+
* Copyright (C) 2014-2023 University of Southern California and
55
* Andrew D. Smith and Benjamin E. Decato
66
*
77
* Authors: Fang Fang and Benjamin E. Decato and Andrew D. Smith
@@ -158,6 +158,22 @@ convert_coordinates(const vector<T> &cpgs, GenomicRegion &region) {
158158
return true;
159159
}
160160

161+
static vector<pair<uint32_t, uint32_t>>
162+
get_chrom_partition(const vector<GenomicRegion> &r) {
163+
if (r.empty()) return {};
164+
vector<pair<uint32_t, uint32_t>> parts;
165+
string prev_chrom = r.front().get_chrom();
166+
uint32_t prev_idx = 0u;
167+
for (auto i = 0u; i < size(r); ++i)
168+
if (r[i].get_chrom() != prev_chrom) {
169+
parts.emplace_back(prev_idx, i);
170+
prev_idx = i;
171+
prev_chrom = r[i].get_chrom();
172+
}
173+
parts.emplace_back(prev_idx, size(r));
174+
return parts;
175+
}
176+
161177
static bool
162178
convert_coordinates(const string &genome_file, vector<GenomicRegion> &amrs) {
163179

@@ -172,18 +188,22 @@ convert_coordinates(const string &genome_file, vector<GenomicRegion> &amrs) {
172188
for (auto i = 0u; i < n_chroms; ++i)
173189
chrom_lookup.emplace(move(c_name[i]), move(c_seq[i]));
174190

175-
vector<uint32_t> cpgs;
176-
string chrom_name;
177-
for (auto &a : amrs) {
178-
if (a.get_chrom() != chrom_name) {
179-
chrom_name = a.get_chrom();
180-
auto c_itr = chrom_lookup.find(chrom_name);
181-
if (c_itr == end(chrom_lookup)) return false;
182-
cpgs = collect_cpgs(c_itr->second);
191+
vector<pair<uint32_t, uint32_t>> chrom_parts = get_chrom_partition(amrs);
192+
std::atomic_uint32_t conv_failure = 0;
193+
194+
#pragma omp parallel for
195+
for (const auto &p : chrom_parts) {
196+
const string chrom_name = amrs[p.first].get_chrom();
197+
auto c_itr = chrom_lookup.find(chrom_name);
198+
if (c_itr == end(chrom_lookup))
199+
conv_failure++;
200+
else {
201+
vector<uint32_t> cpgs = collect_cpgs(c_itr->second);
202+
for (uint32_t i = p.first; i < p.second; ++i)
203+
conv_failure += !convert_coordinates(cpgs, amrs[i]);
183204
}
184-
if (!convert_coordinates(cpgs, a)) return false;
185205
}
186-
return true;
206+
return conv_failure == 0;
187207
}
188208

189209
// make sure the current read is shortened to only include positions
@@ -202,14 +222,21 @@ clip_read(const size_t start_pos, const size_t end_pos, epi_r r) {
202222
static vector<epi_r>
203223
get_current_epireads(const vector<epi_r> &epireads, const size_t max_len,
204224
const size_t cpg_window, const size_t start_pos,
205-
size_t &read_id) {
225+
uint64_t &read_id) {
226+
227+
// assert(is_sorted(cbegin(epireads), cend(epireads),
228+
// [](const epi_r &a, const epi_r &b) {
229+
// return a.pos < b.pos;
230+
// }));
231+
206232
const auto n_epi = size(epireads);
207233

208-
while (read_id < n_epi && epireads[read_id].pos + max_len <= start_pos)
234+
while (read_id < epireads.size() && epireads[read_id].pos + max_len <= start_pos)
209235
++read_id;
210236

211-
vector<epi_r> current_epireads;
212237
const auto end_pos = start_pos + cpg_window;
238+
239+
vector<epi_r> current_epireads;
213240
for (auto i = read_id; i < n_epi && epireads[i].pos < end_pos; ++i)
214241
if (epireads[i].end() > start_pos)
215242
current_epireads.push_back(clip_read(start_pos, end_pos, epireads[i]));
@@ -420,16 +447,18 @@ main_amrfinder(int argc, const char **argv) {
420447

421448
while (read_epiread(in, er)) {
422449
if (!epireads.empty() && er.chr != prev_chrom) {
423-
windows_tested += process_chrom(verbose, n_threads, min_obs_per_cpg,
424-
window_size, epistat, prev_chrom, epireads, amrs);
450+
windows_tested +=
451+
process_chrom(verbose, n_threads, min_obs_per_cpg, window_size,
452+
epistat, prev_chrom, epireads, amrs);
425453
epireads.clear();
426454
}
427455
swap(prev_chrom, er.chr);
428456
epireads.emplace_back(er.pos, er.seq);
429457
}
430458
if (!epireads.empty())
431-
windows_tested += process_chrom(verbose, n_threads, min_obs_per_cpg, window_size,
432-
epistat, prev_chrom, epireads, amrs);
459+
windows_tested +=
460+
process_chrom(verbose, n_threads, min_obs_per_cpg, window_size, epistat,
461+
prev_chrom, epireads, amrs);
433462

434463
// because the threads might not add these in order
435464
sort(begin(amrs), end(amrs));

0 commit comments

Comments
 (0)