Skip to content

Commit 5c25128

Browse files
committed
Merge #85: Make explicit non-passive Sqlite WAL checkpoints
Approved-by: Qqwy Priority: Normal Auto-deploy: false
2 parents 1efa6a6 + 1f005b9 commit 5c25128

5 files changed

Lines changed: 38 additions & 6 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ members = [
99
]
1010

1111
[workspace.package]
12-
version = "0.34.1"
12+
version = "0.34.2"
1313

1414

1515
[workspace.lints.clippy]

libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/integer_increment")
88

99
input_iter = range(0, 1_000_000)
10-
output_iter = client.run_submission(input_iter, chunk_size=1000)
10+
output_iter = client.run_submission(input_iter, chunk_size=1_000)
1111

1212
# Now do something with the output:
1313
# for x in output_iter:

opsqueue/app/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub async fn async_main() {
4747
.expect("Timed out while initiating the database");
4848

4949
moro_local::async_scope!(|scope| {
50+
scope.spawn(db_pool.periodically_checkpoint_wal());
51+
5052
scope.spawn(opsqueue::server::serve_producer_and_consumer(
5153
config,
5254
&server_addr,

opsqueue/src/db/mod.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,28 @@ impl DBPools {
241241
pub async fn writer_conn(&self) -> sqlx::Result<Writer<NoTransaction>> {
242242
self.write_pool.writer_conn().await
243243
}
244+
245+
/// Performas an explicit, non-passive WAL checkpoint
246+
/// We use the 'TRUNCATE' strategy, which will do the most work but will briefly block the writer *and* all readers
247+
///
248+
/// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
249+
pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> {
250+
let mut conn = self.writer_conn().await?;
251+
let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);")
252+
.fetch_one(conn.get_inner())
253+
.await?;
254+
tracing::warn!("WAL checkpoint completed {res:?}");
255+
Ok(())
256+
}
257+
258+
pub async fn periodically_checkpoint_wal(&self) {
259+
const EXPLICIT_WAL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
260+
let mut interval = tokio::time::interval(EXPLICIT_WAL_CHECK_INTERVAL);
261+
loop {
262+
let _ = self.perform_explicit_wal_checkpoint().await;
263+
interval.tick().await;
264+
}
265+
}
244266
}
245267

246268
/// A connection pool.
@@ -366,9 +388,14 @@ fn db_options(database_filename: &str) -> SqliteConnectOptions {
366388
.synchronous(SqliteSynchronous::Normal) // Full is not needed because we use WAL mode
367389
.busy_timeout(Duration::from_secs(5)) // No query should ever lock for more than 5 seconds
368390
.foreign_keys(true) // By default SQLite does not do foreign key checks; we want them to ensure data consistency
369-
.pragma("mmap_size", "134217728")
391+
// Caching:
392+
.pragma("mmap_size", format!("{}", 128 * 1024 * 1024))
370393
.pragma("cache_size", "-1000000") // Cache size of 10⁶ KiB AKA 1GiB (negative value means measured in KiB rather than in multiples of the page size)
371-
// NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk)
394+
// NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk)
395+
// WAL checkpointing
396+
.busy_timeout(Duration::from_secs(5)) // Matches the SQLx default (*not* the sqlite-on-its-own default, which is 'error immediately'), but made explicit as it is subject to change
397+
.pragma("wal_autocheckpoint", "0") // Turn the passive autocheckpointing _off_, as we do our own explicit active checkpointing
398+
.pragma("journal_size_limit", format!("{}", 4 * 1024 * 1024)) // Truncate WAL file down to 4 MiB after checkpointing
372399
}
373400

374401
async fn ensure_db_exists(database_filename: &str) {
@@ -412,6 +439,9 @@ async fn db_connect_read_pool(
412439
}
413440

414441
/// Connect the writer pool.
442+
///
443+
/// It intentionally only contains a single connection
444+
/// as SQLite only allows a single write at a time
415445
async fn db_connect_write_pool(database_filename: &str) -> WriterPool {
416446
SqlitePoolOptions::new()
417447
.min_connections(1)

0 commit comments

Comments
 (0)