Skip to content

Commit e5d1153

Browse files
authored
fix: delta of delta take n (#40)
1 parent 7a8799a commit e5d1153

3 files changed

Lines changed: 66 additions & 31 deletions

File tree

columnar/src/column/delta_of_delta.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl<T: DeltaOfDeltable> ColumnTrait for DeltaOfDeltaColumn<T> {
4949
where
5050
Self: Sized,
5151
{
52-
let mut delta_of_delta_decoder = DeltaOfDeltaDecoder::new(bytes);
52+
let mut delta_of_delta_decoder = DeltaOfDeltaDecoder::new(bytes)?;
5353
let data = delta_of_delta_decoder.decode()?;
5454
Ok(Self {
5555
data,

columnar/src/iterable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ pub struct DeltaOfDeltaIter<'de, T> {
132132
impl<'de, T: DeltaOfDeltable> DeltaOfDeltaIter<'de, T> {
133133
pub fn new(bytes: &'de [u8]) -> Self {
134134
Self {
135-
decoder: DeltaOfDeltaDecoder::new(bytes),
135+
decoder: DeltaOfDeltaDecoder::new(bytes).unwrap(),
136136
}
137137
}
138138

columnar/src/strategy/rle.rs

Lines changed: 64 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -496,9 +496,7 @@ impl DeltaOfDeltaEncoder {
496496
#[inline(never)]
497497
pub fn finish(self) -> Result<Vec<u8>, ColumnarError> {
498498
let mut bytes = Vec::with_capacity(self.bits.len() * 8 + 1 + 8);
499-
if let Some(head_num) = self.head_num {
500-
bytes.extend_from_slice(&postcard::to_allocvec(&head_num)?);
501-
}
499+
bytes.extend_from_slice(&postcard::to_allocvec(&self.head_num)?);
502500
let used = self.last_used_bit.div_ceil(8);
503501
bytes.push(if self.last_used_bit % 8 == 0 && self.use_bit {
504502
8
@@ -526,33 +524,39 @@ pub struct DeltaOfDeltaDecoder<'de, T> {
526524
}
527525

528526
impl<'de, T: DeltaOfDeltable> DeltaOfDeltaDecoder<'de, T> {
529-
pub fn new(bytes: &'de [u8]) -> Self {
530-
// println!("\ndecode bytes {:?}", &bytes);
531-
if bytes.len() < 2 {
532-
return Self {
533-
bits: bytes,
534-
head_num: None,
535-
prev_value: 0,
536-
prev_delta: 0,
537-
index: 0,
538-
current_bits_index: 0,
539-
last_used_bit: 0,
540-
_t: PhantomData,
541-
};
527+
pub fn new(bytes: &'de [u8]) -> Result<Self, ColumnarError> {
528+
let (head_num, bytes) = postcard::take_from_bytes(bytes)?;
529+
if bytes.is_empty() {
530+
return Err(ColumnarError::RleDecodeError(
531+
"invalid DeltaOfDelta input".to_string(),
532+
));
542533
}
543-
let (head_num, bytes) = postcard::take_from_bytes(bytes).unwrap();
544534
let last_used_bit = bytes[0];
545535
let bits = &bytes[1..];
546-
Self {
536+
Ok(Self {
547537
bits,
548-
head_num: Some(head_num),
538+
head_num,
549539
prev_value: 0,
550540
prev_delta: 0,
551541
index: 0,
552542
current_bits_index: 0,
553543
last_used_bit,
554544
_t: PhantomData,
555-
}
545+
})
546+
547+
// let (head_num, bytes) = postcard::take_from_bytes(bytes).unwrap();
548+
// let last_used_bit = bytes[0];
549+
// let bits = &bytes[1..];
550+
// Self {
551+
// bits: &bytes,
552+
// head_num: None,
553+
// prev_value: 0,
554+
// prev_delta: 0,
555+
// index: 0,
556+
// current_bits_index: 0,
557+
// last_used_bit: 0,
558+
// _t: PhantomData,
559+
// }
556560
}
557561

558562
pub fn decode(&mut self) -> Result<Vec<T>, ColumnarError> {
@@ -651,13 +655,14 @@ impl<'de, T: DeltaOfDeltable> DeltaOfDeltaDecoder<'de, T> {
651655
Some(ans)
652656
}
653657

654-
pub fn finalize(self) -> Result<&'de [u8], ColumnarError> {
655-
let idx = if self.current_bits_index != 0 {
656-
(self.index + 1).min(self.bits.len())
657-
} else {
658-
self.index
659-
};
660-
Ok(&self.bits[idx..])
658+
pub fn finalize(mut self) -> Result<&'de [u8], ColumnarError> {
659+
if self.bits.is_empty() {
660+
return Ok(self.bits);
661+
}
662+
if self.current_bits_index > 0 {
663+
self.index += 1;
664+
}
665+
Ok(&self.bits[self.index..])
661666
}
662667

663668
pub fn take_n_finalize(mut self, n: usize) -> Result<(Vec<T>, &'de [u8]), ColumnarError> {
@@ -667,7 +672,7 @@ impl<'de, T: DeltaOfDeltable> DeltaOfDeltaDecoder<'de, T> {
667672
ans.push(v);
668673
} else {
669674
return Err(ColumnarError::RleDecodeError(format!(
670-
"The elements of decoder is less than n({})",
675+
"The elements of decoder is less than n ({})",
671676
n
672677
)));
673678
}
@@ -713,8 +718,15 @@ impl<'de, T: DeltaOfDeltable> Iterator for DeltaOfDeltaDecoder<'de, T> {
713718
}
714719
}
715720

721+
#[cfg(test)]
716722
mod test {
717723

724+
use rand::Rng;
725+
726+
use crate::column::delta_rle;
727+
728+
use super::{DeltaOfDeltaEncoder, DeltaRleEncoder};
729+
718730
#[test]
719731
fn test_rle() {
720732
use super::*;
@@ -777,8 +789,31 @@ mod test {
777789
encoder.append(6).unwrap();
778790
let buf = encoder.finish().unwrap();
779791
// println!("{:?}", buf);
780-
let mut delta_of_delta_rle_decoder = DeltaOfDeltaDecoder::new(&buf);
792+
let mut delta_of_delta_rle_decoder = DeltaOfDeltaDecoder::new(&buf).unwrap();
781793
let values: Vec<i64> = delta_of_delta_rle_decoder.decode().unwrap();
782794
assert_eq!(values, vec![1, 2, 3, 4, 5, 6]);
783795
}
796+
797+
#[test]
798+
fn test_size() {
799+
let mut rng = rand::thread_rng();
800+
for p in 1..10 {
801+
let mut i = 0;
802+
let mut n = 0;
803+
let mut delta_rle = DeltaRleEncoder::new();
804+
let mut delta_of_delta = DeltaOfDeltaEncoder::new();
805+
while n < 5000 {
806+
if rng.gen_bool(0.93) {
807+
delta_rle.append(i);
808+
delta_of_delta.append(i);
809+
n += 1;
810+
}
811+
i += 1;
812+
}
813+
814+
println!("==0.{}==", p);
815+
println!("delta rle {}", delta_rle.finish().unwrap().len());
816+
println!("delta of delta {}", delta_of_delta.finish().unwrap().len());
817+
}
818+
}
784819
}

0 commit comments

Comments
 (0)