Skip to content

Commit 1b5364d

Browse files
committed
feat: add page data serialize with compression
1 parent 2bc0fef commit 1b5364d

4 files changed

Lines changed: 71 additions & 8 deletions

File tree

src/config.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ pub struct WALConfig {
2828
pub struct CompressionConfig {
2929
/// The compression level.
3030
pub level: u32,
31+
/// Enable compression
32+
pub enabled: bool,
3133
}
3234

3335
impl Default for CompressionConfig {
@@ -38,7 +40,10 @@ impl Default for CompressionConfig {
3840

3941
impl CompressionConfig {
4042
pub fn new() -> Self {
41-
Self { level: 6 }
43+
Self {
44+
level: 6,
45+
enabled: false,
46+
}
4247
}
4348

4449
/// The compression level.
@@ -47,6 +52,13 @@ impl CompressionConfig {
4752
self.level = level;
4853
self
4954
}
55+
56+
/// Enable compression
57+
/// Default: true
58+
pub fn enabled(&mut self, enabled: bool) -> &mut Self {
59+
self.enabled = enabled;
60+
self
61+
}
5062
}
5163

5264
impl Default for DustDataConfig {

src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,12 @@
4646
//! let user = collection.get("user:1").unwrap();
4747
//! ```
4848
49+
pub mod btree;
4950
pub mod collection;
5051
pub mod config;
5152
pub mod error;
53+
pub mod page;
54+
mod ser_de;
5255

5356
pub use collection::Collection;
5457
pub use config::*;
@@ -61,9 +64,6 @@ use std::fmt::Debug;
6164
use std::fs;
6265
use std::sync::Arc;
6366

64-
pub mod btree;
65-
pub mod page;
66-
6767
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
6868
pub enum Either<L, R> {
6969
Left(L),

src/page.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66

77
use crate::{
88
error::{CorruptedDataError, CorruptedDataKind, Error, Result},
9+
ser_de::{deserialize, serialize},
910
Either,
1011
};
1112
use crc32fast::Hasher;
@@ -78,7 +79,7 @@ impl<T: Serialize + DeserializeOwned + PartialOrd + Ord + Clone> Page<T> {
7879
}
7980

8081
pub fn write(&mut self, data: T) -> Result<(LocationOffset, LocationOffset)> {
81-
let data = bincode::serialize(&data).map_err(Error::SerializeError)?;
82+
let data = serialize(&data)?;
8283

8384
// cell_addr is the position of the cell in the page
8485
let cell_addr: LocationOffset = self.header.upper - data.len() as LocationOffset;
@@ -130,7 +131,7 @@ impl<T: Serialize + DeserializeOwned + PartialOrd + Ord + Clone> Page<T> {
130131
index: LocationOffset,
131132
data: T,
132133
) -> Result<(LocationOffset, LocationOffset)> {
133-
let data = bincode::serialize(&data).map_err(Error::SerializeError)?;
134+
let data = serialize(&data)?;
134135

135136
let offset = self.index_to_offset(index);
136137

@@ -188,7 +189,7 @@ impl<T: Serialize + DeserializeOwned + PartialOrd + Ord + Clone> Page<T> {
188189
}
189190

190191
pub fn replace(&mut self, index: LocationOffset, data: T) -> Result<T> {
191-
let data = bincode::serialize(&data).map_err(Error::SerializeError)?;
192+
let data = serialize(&data)?;
192193

193194
let offset = self.index_to_offset(index);
194195

@@ -259,7 +260,7 @@ impl<T: Serialize + DeserializeOwned + PartialOrd + Ord + Clone> Page<T> {
259260
.map_err(Error::IoError)?;
260261
buffer.read_exact(&mut data).unwrap();
261262

262-
let data = bincode::deserialize(&data).map_err(Error::SerializeError)?;
263+
let data = deserialize(&data)?;
263264

264265
Ok(Some(data))
265266
}

src/ser_de.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use crate::{
2+
error::{Error, Result},
3+
CompressionConfig,
4+
};
5+
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
6+
use serde::{de::DeserializeOwned, Serialize};
7+
use std::{
8+
io::{Read, Write},
9+
sync::OnceLock,
10+
};
11+
12+
fn compression_config() -> &'static CompressionConfig {
13+
static COMPRESSION_CONFIG: OnceLock<CompressionConfig> = OnceLock::new();
14+
COMPRESSION_CONFIG.get_or_init(CompressionConfig::default)
15+
}
16+
17+
pub fn serialize<T>(data: &T) -> Result<Vec<u8>>
18+
where
19+
T: Serialize,
20+
{
21+
let mut bytes = bincode::serialize(data).map_err(Error::SerializeError)?;
22+
23+
let compression_config = compression_config();
24+
25+
if compression_config.enabled {
26+
let mut encoder = GzEncoder::new(Vec::new(), Compression::new(compression_config.level));
27+
encoder.write_all(&bytes).map_err(Error::IoError)?;
28+
29+
bytes = encoder.finish().map_err(Error::IoError)?;
30+
}
31+
32+
Ok(bytes)
33+
}
34+
35+
pub fn deserialize<T>(bytes: &[u8]) -> Result<T>
36+
where
37+
T: DeserializeOwned,
38+
{
39+
let compression_config = compression_config();
40+
let mut decoder = GzDecoder::new(bytes);
41+
42+
if compression_config.enabled && decoder.header().is_some() {
43+
let mut buffer = Vec::new();
44+
decoder.read_to_end(&mut buffer).unwrap();
45+
46+
bincode::deserialize(&buffer).map_err(Error::SerializeError)
47+
} else {
48+
bincode::deserialize(bytes).map_err(Error::SerializeError)
49+
}
50+
}

0 commit comments

Comments
 (0)