Skip to content

Commit f9a1010

Browse files
authored
Merge pull request #67 from ArcInstitute/dev
Dev
2 parents cd0ebce + 4f4b71b commit f9a1010

13 files changed

Lines changed: 269 additions & 117 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "binseq"
3-
version = "0.7.5"
3+
version = "0.7.6"
44
edition = "2021"
55
description = "A high efficiency binary format for sequencing data"
66
license = "MIT"

examples/example.rs

Lines changed: 36 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,84 +3,64 @@ use std::io::{stdout, BufWriter, Write};
33
use std::sync::Arc;
44

55
use anyhow::Result;
6-
use binseq::{BinseqReader, BinseqRecord, ParallelProcessor, ParallelReader};
6+
use binseq::{BinseqReader, BinseqRecord, Context, ParallelProcessor, ParallelReader};
77
use parking_lot::Mutex;
88

99
/// A struct for decoding BINSEQ data back to FASTQ format.
1010
#[derive(Clone)]
1111
pub struct Decoder {
12-
/// Local values
13-
buffer: Vec<u8>,
14-
/// Local buffer for decoding index
15-
ibuf: itoa::Buffer,
16-
/// Local buffer for decoding primary
17-
sbuf: Vec<u8>,
18-
/// Local buffer for decoding secondary
19-
xbuf: Vec<u8>,
12+
/// Reusable context
13+
ctx: Context,
14+
15+
/// local output buffer
16+
local_writer: Vec<u8>,
17+
18+
/// global output buffer
19+
global_writer: Arc<Mutex<Box<dyn Write + Send>>>,
20+
2021
/// Local count of records
2122
local_count: usize,
22-
/// Quality buffer
23-
quality: Vec<u8>,
2423

25-
/// values
26-
global_buffer: Arc<Mutex<Box<dyn Write + Send>>>,
27-
num_records: Arc<Mutex<usize>>,
24+
/// global count of records
25+
global_count: Arc<Mutex<usize>>,
2826
}
2927

3028
impl Decoder {
3129
#[must_use]
3230
pub fn new(writer: Box<dyn Write + Send>) -> Self {
33-
let global_buffer = Arc::new(Mutex::new(writer));
31+
let global_writer = Arc::new(Mutex::new(writer));
3432
Decoder {
35-
buffer: Vec::new(),
36-
ibuf: itoa::Buffer::new(),
37-
sbuf: Vec::new(),
38-
xbuf: Vec::new(),
33+
local_writer: Vec::new(),
34+
ctx: Context::default(),
3935
local_count: 0,
40-
quality: Vec::new(),
41-
global_buffer,
42-
num_records: Arc::new(Mutex::new(0)),
36+
global_writer,
37+
global_count: Arc::new(Mutex::new(0)),
4338
}
4439
}
4540

4641
#[must_use]
4742
pub fn num_records(&self) -> usize {
48-
*self.num_records.lock()
43+
*self.global_count.lock()
4944
}
5045
}
5146
impl ParallelProcessor for Decoder {
5247
fn process_record<R: BinseqRecord>(&mut self, record: R) -> binseq::Result<()> {
53-
// clear decoding buffers
54-
self.sbuf.clear();
55-
self.xbuf.clear();
56-
57-
// decode index
58-
let index = self.ibuf.format(record.index()).as_bytes();
59-
60-
// write primary fastq to local buffer
61-
record.decode_s(&mut self.sbuf)?;
62-
if self.quality.len() < self.sbuf.len() {
63-
self.quality.resize(self.sbuf.len(), b'?');
64-
}
65-
let squal = if record.has_quality() {
66-
record.squal()
67-
} else {
68-
&self.quality[..self.sbuf.len()]
69-
};
70-
write_fastq_parts(&mut self.buffer, index, &self.sbuf, squal)?;
48+
self.ctx.fill(&record)?;
49+
write_fastq_parts(
50+
&mut self.local_writer,
51+
self.ctx.sheader(),
52+
self.ctx.sbuf(),
53+
self.ctx.squal(),
54+
)?;
7155

7256
// write extended fastq to local buffer
7357
if record.is_paired() {
74-
record.decode_x(&mut self.xbuf)?;
75-
if self.quality.len() < self.xbuf.len() {
76-
self.quality.resize(self.xbuf.len(), b'?');
77-
}
78-
let xqual = if record.has_quality() {
79-
record.xqual()
80-
} else {
81-
&self.quality[..self.xbuf.len()]
82-
};
83-
write_fastq_parts(&mut self.buffer, index, &self.xbuf, xqual)?;
58+
write_fastq_parts(
59+
&mut self.local_writer,
60+
self.ctx.xheader(),
61+
&self.ctx.xbuf(),
62+
self.ctx.xqual(),
63+
)?;
8464
}
8565

8666
self.local_count += 1;
@@ -90,18 +70,18 @@ impl ParallelProcessor for Decoder {
9070
fn on_batch_complete(&mut self) -> binseq::Result<()> {
9171
// Lock the mutex to write to the global buffer
9272
{
93-
let mut lock = self.global_buffer.lock();
94-
lock.write_all(&self.buffer)?;
73+
let mut lock = self.global_writer.lock();
74+
lock.write_all(&self.local_writer)?;
9575
lock.flush()?;
9676
}
9777
// Lock the mutex to update the number of records
9878
{
99-
let mut num_records = self.num_records.lock();
100-
*num_records += self.local_count;
79+
let mut global_count = self.global_count.lock();
80+
*global_count += self.local_count;
10181
}
10282

10383
// Clear the local buffer and reset the local record count
104-
self.buffer.clear();
84+
self.local_writer.clear();
10585
self.local_count = 0;
10686
Ok(())
10787
}

examples/grep.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
use std::sync::Arc;
22

33
use anyhow::Result;
4-
use binseq::{BinseqReader, ParallelProcessor, ParallelReader};
4+
use binseq::{BinseqReader, Context, ParallelProcessor, ParallelReader};
55
use memchr::memmem::Finder;
66
use parking_lot::Mutex;
77

88
#[derive(Clone)]
99
pub struct GrepCounter {
1010
// (thread) local variables
11-
sbuf: Vec<u8>,
12-
xbuf: Vec<u8>,
11+
ctx: Context,
1312
local_count: usize,
1413

1514
// search pattern (using memchr::memmem::Finder for fast searching)
@@ -22,8 +21,7 @@ impl GrepCounter {
2221
#[must_use]
2322
pub fn new(pattern: &[u8]) -> Self {
2423
Self {
25-
sbuf: Vec::new(),
26-
xbuf: Vec::new(),
24+
ctx: Context::default(),
2725
pattern: Finder::new(pattern).into_owned(),
2826
local_count: 0,
2927
count: Arc::new(Mutex::new(0)),
@@ -34,25 +32,15 @@ impl GrepCounter {
3432
self.pattern.find(seq).is_some()
3533
}
3634

37-
fn clear_buffers(&mut self) {
38-
self.sbuf.clear();
39-
self.xbuf.clear();
40-
}
41-
4235
fn pprint(&self) {
4336
println!("Matching records: {}", self.count.lock());
4437
}
4538
}
4639
impl ParallelProcessor for GrepCounter {
4740
fn process_record<R: binseq::BinseqRecord>(&mut self, record: R) -> binseq::Result<()> {
48-
self.clear_buffers();
49-
50-
record.decode_s(&mut self.sbuf)?;
51-
if record.is_paired() {
52-
record.decode_x(&mut self.xbuf)?;
53-
}
41+
self.ctx.fill_sequences(&record)?;
5442

55-
if self.match_sequence(&self.sbuf) || self.match_sequence(&self.xbuf) {
43+
if self.match_sequence(&self.ctx.sbuf()) || self.match_sequence(&self.ctx.xbuf()) {
5644
self.local_count += 1;
5745
}
5846

examples/index.rs

Lines changed: 0 additions & 14 deletions
This file was deleted.

examples/parallel_processing.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@ use std::{
1010
use anyhow::{bail, Result};
1111
use binseq::{
1212
bq::{self, BinseqHeaderBuilder},
13-
BinseqReader, BinseqRecord, ParallelProcessor, ParallelReader,
13+
BinseqReader, BinseqRecord, Context, ParallelProcessor, ParallelReader,
1414
};
1515
use nucgen::Sequence;
1616

1717
#[derive(Clone, Default)]
1818
pub struct MyProcessor {
1919
local_counter: usize,
2020
counter: Arc<AtomicUsize>,
21-
sbuf: Vec<u8>,
22-
xbuf: Vec<u8>,
21+
ctx: Context,
2322
}
2423
impl MyProcessor {
2524
#[must_use]
@@ -29,12 +28,7 @@ impl MyProcessor {
2928
}
3029
impl ParallelProcessor for MyProcessor {
3130
fn process_record<R: BinseqRecord>(&mut self, record: R) -> binseq::Result<()> {
32-
self.sbuf.clear();
33-
self.xbuf.clear();
34-
record.decode_s(&mut self.sbuf)?;
35-
if record.is_paired() {
36-
record.decode_x(&mut self.xbuf)?;
37-
}
31+
self.ctx.fill_sequences(&record)?;
3832
self.local_counter += 1;
3933
Ok(())
4034
}

src/bq/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ impl MmapReader {
323323
let config = RecordConfig::from_header(&header);
324324

325325
// Immediately validate the size of the file against the expected byte size of records
326-
if (mmap.len() - SIZE_HEADER) % config.record_size_bytes() != 0 {
326+
if !(mmap.len() - SIZE_HEADER).is_multiple_of(config.record_size_bytes()) {
327327
return Err(ReadError::FileTruncation(mmap.len()).into());
328328
}
329329

src/bq/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ impl<W: Write> BinseqWriter<W> {
371371
if has_flag {
372372
write_flag(&mut self.inner, flag.unwrap_or(0))?;
373373
}
374-
write_buffer(&mut self.inner, &sbuffer)?;
374+
write_buffer(&mut self.inner, sbuffer)?;
375375
Ok(true)
376376
} else {
377377
Ok(false)

0 commit comments

Comments
 (0)