Skip to content

Commit 8883d92

Browse files
Using a thread pool instead of setting number of threads. Also changed how good reads and bases are tabulated for in and out to be reported in the stats output.
1 parent 4ecf656 commit 8883d92

1 file changed

Lines changed: 60 additions & 50 deletions

File tree

src/utils/uniq.cpp

Lines changed: 60 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
#include <iostream>
2424
#include <stdexcept>
2525

26+
// generated by autotools
2627
#include <config.h>
2728

29+
// from HTSlib
2830
#include <htslib/sam.h>
31+
#include <htslib/thread_pool.h>
2932

33+
// from smithlab
3034
#include "OptionParser.hpp"
3135
#include "smithlab_utils.hpp"
3236
#include "smithlab_os.hpp"
@@ -43,8 +47,16 @@ using std::runtime_error;
4347
using std::to_string;
4448

4549

50+
inline bool
51+
format_is_bam_or_sam(htsFile *hts) {
52+
const htsFormat *fmt = hts_get_format(hts);
53+
return fmt->category == sequence_data &&
54+
(fmt->format == bam || fmt->format == sam);
55+
}
56+
57+
4658
inline string
47-
read_name(const bam1_t *b) {return string(bam_get_qname(b));}
59+
qname(const bam1_t *b) {return string(bam_get_qname(b));}
4860

4961

5062
inline int32_t
@@ -80,24 +92,31 @@ equivalent_end_and_strand(const bam1_t *a, const bam1_t *b) {
8092
}
8193

8294

95+
struct rd_stats { // keep track of good bases/reads in and out
96+
size_t bases;
97+
size_t reads;
98+
rd_stats() : bases(0), reads(0) {}
99+
void update(bam1_t *b) { bases += qlen(b); ++reads; }
100+
};
101+
102+
83103
static void
84-
write_stats_output(const size_t reads_in, const size_t reads_out,
85-
const size_t good_bases_in, const size_t good_bases_out,
86-
const size_t reads_with_dups, const string &statfile) {
104+
write_stats_output(const rd_stats &rs_in, const rd_stats &rs_out,
105+
const size_t reads_duped, const string &statfile) {
87106
if (!statfile.empty()) {
88-
const size_t reads_removed = reads_in - reads_out;
107+
const size_t reads_removed = rs_in.reads - rs_out.reads;
89108
const double non_dup_frac =
90-
(reads_out - reads_with_dups)/static_cast<double>(reads_in);
109+
(rs_out.reads - reads_duped)/static_cast<double>(rs_in.reads);
91110
const double dup_rate =
92-
(reads_removed + reads_with_dups)/static_cast<double>(reads_with_dups);
111+
(reads_removed + reads_duped)/static_cast<double>(reads_duped);
93112
ofstream out_stat(statfile);
94113
if (!out_stat) throw runtime_error("bad stats output file");
95-
out_stat << "total_reads: " << reads_in << endl
96-
<< "total_bases: " << good_bases_in << endl
97-
<< "unique_reads: " << reads_out << endl
98-
<< "unique_read_bases: " << good_bases_out << endl
114+
out_stat << "total_reads: " << rs_in.reads << endl
115+
<< "total_bases: " << rs_in.bases << endl
116+
<< "unique_reads: " << rs_out.reads << endl
117+
<< "unique_read_bases: " << rs_out.bases << endl
99118
<< "non_duplicate_fraction: " << non_dup_frac << endl
100-
<< "duplicate_reads: " << reads_with_dups << endl
119+
<< "duplicate_reads: " << reads_duped << endl
101120
<< "reads_removed: " << reads_removed << endl
102121
<< "duplication_rate: " << dup_rate << endl;
103122
}
@@ -124,9 +143,8 @@ static void
124143
process_inner_buffer(const vector<bam1_t*>::const_iterator it,
125144
const vector<bam1_t*>::const_iterator jt,
126145
sam_hdr_t *hdr, samFile *out,
127-
size_t &reads_out,
128-
size_t &good_bases_out,
129-
size_t &reads_with_dups,
146+
rd_stats &rs_out,
147+
size_t &reads_duped,
130148
vector<size_t> &hist) {
131149
const size_t n_reads = std::distance(it, jt);
132150
const size_t selected = rand() % n_reads;
@@ -135,29 +153,25 @@ process_inner_buffer(const vector<bam1_t*>::const_iterator it,
135153
if (hist.size() <= n_reads)
136154
hist.resize(n_reads + 1);
137155
hist[n_reads]++;
138-
good_bases_out += qlen(*(it + selected));
139-
++reads_out;
140-
reads_with_dups += (n_reads > 1);
156+
rs_out.update(*(it + selected));
157+
reads_duped += (n_reads > 1);
141158
}
142159

143160

144161
/* The buffer corresponds to reads sharing the same mapping chromosome
145162
and start position. These are gathered and then processed together. */
146163
static void
147-
process_buffer(size_t &reads_out, size_t &good_bases_out,
148-
size_t &reads_with_dups, vector<size_t> &hist,
164+
process_buffer(rd_stats &rs_out, size_t &reads_duped, vector<size_t> &hist,
149165
vector<bam1_t*> &buffer, sam_hdr_t *hdr, samFile *out) {
150166
sort(begin(buffer), end(buffer), precedes_by_end_and_strand);
151167
auto it(begin(buffer));
152168
auto jt = it + 1;
153169
for (; jt != end(buffer); ++jt)
154170
if (!equivalent_end_and_strand(*it, *jt)) {
155-
process_inner_buffer(it, jt, hdr, out, reads_out, good_bases_out,
156-
reads_with_dups, hist);
171+
process_inner_buffer(it, jt, hdr, out, rs_out, reads_duped, hist);
157172
it = jt;
158173
}
159-
process_inner_buffer(it, jt, hdr, out, reads_out, good_bases_out,
160-
reads_with_dups, hist);
174+
process_inner_buffer(it, jt, hdr, out, rs_out, reads_duped, hist);
161175

162176
// free the bam1_t pointers before clearing the buffer
163177
for (size_t i = 0; i < buffer.size(); ++i)
@@ -193,21 +207,21 @@ uniq(const bool VERBOSE, const size_t n_threads,
193207
if (!hts || errno)
194208
throw runtime_error("bad htslib file: " + infile);
195209

196-
if (n_threads > 1 && hts_set_threads(hts, n_threads/2) < 0)
210+
htsThreadPool the_thread_pool{hts_tpool_init(n_threads), 0};
211+
if (hts_set_thread_pool(hts, &the_thread_pool) < 0)
197212
throw runtime_error("error setting threads");
198213

199-
if (hts_get_format(hts)->category != sequence_data)
214+
if (!format_is_bam_or_sam(hts))
200215
throw runtime_error("bad file format: " + infile);
201216

202217
sam_hdr_t *hdr = sam_hdr_read(hts);
203218
if (!hdr)
204219
throw runtime_error("failed to read header: " + infile);
205220

206221
// open the output file
207-
samFile *out = bam_format ? hts_open(outfile.c_str(), "wb") :
208-
hts_open(outfile.c_str(), "w");
222+
samFile *out = hts_open(outfile.c_str(), bam_format ? "wb" : "w");
209223

210-
if (n_threads > 1 && hts_set_threads(out, (n_threads + 1)/2) < 0)
224+
if (hts_set_thread_pool(out, &the_thread_pool) < 0)
211225
throw runtime_error("error setting threads");
212226

213227
// take care of the output file's header
@@ -224,13 +238,12 @@ uniq(const bool VERBOSE, const size_t n_threads,
224238
if (!aln)
225239
throw runtime_error("failed parsing read from input file");
226240

227-
// values to tabulate stats; these cost almost nothing
241+
// values to tabulate stats; no real cost
242+
rd_stats rs_in, rs_out;
243+
size_t reads_duped = 0;
228244
vector<size_t> hist;
229-
size_t reads_in = 1; // count the one we just got
230-
size_t good_bases_in = qlen(aln); // count its good bases
231-
size_t reads_out = 0;
232-
size_t good_bases_out = 0;
233-
size_t reads_with_dups = 0;
245+
246+
rs_in.update(aln); // update for the input we just got
234247

235248
vector<bam1_t*> buffer(1, aln); // select output from this buffer
236249

@@ -239,36 +252,33 @@ uniq(const bool VERBOSE, const size_t n_threads,
239252
int32_t cur_chrom = get_tid(aln);
240253

241254
while (aln = get_read(hts, hdr)) {
242-
++reads_in;
243-
good_bases_in += qlen(aln);
255+
rs_in.update(aln);
256+
257+
// below works because buffer reset at every new chrom
258+
if (precedes_by_start(aln, buffer[0]))
259+
throw runtime_error("not sorted: " + qname(buffer[0]) + " " + qname(aln));
244260

245-
// below works because buffer is reset every chrom
246-
if (precedes_by_start(aln, buffer.front()))
247-
throw runtime_error("input not properly sorted:\n" +
248-
read_name(buffer[0]) + "\n" + read_name(aln));
249261
const int32_t chrom = get_tid(aln);
250262
if (chrom != cur_chrom) {
251263
if (chroms_seen[chrom]) throw runtime_error("input not sorted");
252264
chroms_seen[chrom] = true;
253265
cur_chrom = chrom;
254266
}
255267

256-
if (!equivalent_chrom_and_start(buffer.front(), aln))
257-
process_buffer(reads_out, good_bases_out, reads_with_dups,
258-
hist, buffer, hdr, out);
268+
if (!equivalent_chrom_and_start(buffer[0], aln))
269+
process_buffer(rs_out, reads_duped, hist, buffer, hdr, out);
259270
buffer.push_back(aln);
260271
}
261-
process_buffer(reads_out, good_bases_out, reads_with_dups,
262-
hist, buffer, hdr, out);
272+
process_buffer(rs_out, reads_duped, hist, buffer, hdr, out);
263273

264-
// remember to close these
274+
// remember to turn off the lights
265275
bam_hdr_destroy(hdr);
266276
hts_close(out);
267277
hts_close(hts);
278+
hts_tpool_destroy(the_thread_pool.pool);
268279

269-
write_stats_output(reads_in, reads_out, good_bases_in, good_bases_out,
270-
reads_with_dups, statfile);
271-
280+
// write any additional output requested
281+
write_stats_output(rs_in, rs_out, reads_duped, statfile);
272282
write_hist_output(hist, histfile);
273283
}
274284

0 commit comments

Comments
 (0)