Skip to content

Commit fe169cb

Browse files
committed
Make explicit non-passive Sqlite WAL checkpoints
The default way SQLite in WAL mode performs checkpoints, is by waiting for quiet time where there are no readers nor writers. Any read-connection can block this passive WAL checkpointing from making progress. We have observed that in production when having large workloads with consumers working on hundreds of chunks concurrently. Whereas the WAL is not expected to grow much beyond 4MiB (that's when the passive autocheckpointing kicks in), we saw WALs of > 850MiB. At that point, reads slow down significantly and that can cause a failure for the system as a whole. To mitigate this, as per the SQLite docs, this PR: - Disables the passive autocheckpointing (leaving it enabled conflicts with manual checkpointing which can then result in a SQLITE_BUSY) - Runs active checkpointing _every second_. It is very fast when there is little-to-no work to do, but under load it is expected that we'll hit the '1000 page mutations' (AKA the 4MiB default WAL size) within a second. - We use the 'RESTART' strategy, together with a `journal_file_limit` that ensures that the WAL will be trimmed down to the max of 4MiB. That means we don't always fully truncate, nor do we keep it at 'whatever the max happened to be'. Doing this checkpointing does mean that every second there is a tiny timeframe in which both write-tasks and also all read-tasks will have to wait. This is unlikely to cause any problems. We now explicitly configure the busy timeout to be 5 seconds, which was the prior implicit default of SQLx/Rusqlite for good measure.
1 parent 1efa6a6 commit fe169cb

3 files changed

Lines changed: 35 additions & 3 deletions

File tree

libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/integer_increment")
88

99
input_iter = range(0, 1_000_000)
10-
output_iter = client.run_submission(input_iter, chunk_size=1000)
10+
output_iter = client.run_submission(input_iter, chunk_size=10_000)
1111

1212
# Now do something with the output:
1313
# for x in output_iter:

opsqueue/app/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ pub async fn async_main() {
4747
.expect("Timed out while initiating the database");
4848

4949
moro_local::async_scope!(|scope| {
50+
scope.spawn(db_pool.periodically_checkpoint_wal());
51+
5052
scope.spawn(opsqueue::server::serve_producer_and_consumer(
5153
config,
5254
&server_addr,

opsqueue/src/db/mod.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,28 @@ impl DBPools {
241241
pub async fn writer_conn(&self) -> sqlx::Result<Writer<NoTransaction>> {
242242
self.write_pool.writer_conn().await
243243
}
244+
245+
/// Performas an explicit, non-passive WAL checkpoint
246+
/// We use the 'TRUNCATE' strategy, which will do the most work but will briefly block the writer *and* all readers
247+
///
248+
/// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
249+
pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> {
250+
let mut conn = self.writer_conn().await?;
251+
let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);")
252+
.fetch_one(conn.get_inner())
253+
.await?;
254+
tracing::warn!("WAL checkpoint completed {res:?}");
255+
Ok(())
256+
}
257+
258+
pub async fn periodically_checkpoint_wal(&self) {
259+
const EXPLICIT_WAL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
260+
let mut interval = tokio::time::interval(EXPLICIT_WAL_CHECK_INTERVAL);
261+
loop {
262+
let _ = self.perform_explicit_wal_checkpoint().await;
263+
interval.tick().await;
264+
}
265+
}
244266
}
245267

246268
/// A connection pool.
@@ -366,9 +388,14 @@ fn db_options(database_filename: &str) -> SqliteConnectOptions {
366388
.synchronous(SqliteSynchronous::Normal) // Full is not needed because we use WAL mode
367389
.busy_timeout(Duration::from_secs(5)) // No query should ever lock for more than 5 seconds
368390
.foreign_keys(true) // By default SQLite does not do foreign key checks; we want them to ensure data consistency
369-
.pragma("mmap_size", "134217728")
391+
// Caching:
392+
.pragma("mmap_size", format!("{}", 128 * 1024 * 1024))
370393
.pragma("cache_size", "-1000000") // Cache size of 10⁶ KiB AKA 1GiB (negative value means measured in KiB rather than in multiples of the page size)
371-
// NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk)
394+
// NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk)
395+
// WAL checkpointing
396+
.busy_timeout(Duration::from_secs(5)) // Matches the SQLx default (*not* the sqlite-on-its-own default, which is 'error immediately'), but made explicit as it is subject to change
397+
.pragma("wal_autocheckpoint", "0") // Turn the passive autocheckpointing _off_, as we do our own explicit active checkpointing
398+
.pragma("journal_size_limit", format!("{}", 4 * 1024 * 1024)) // Truncate WAL file down to 4 MiB after checkpointing
372399
}
373400

374401
async fn ensure_db_exists(database_filename: &str) {
@@ -412,6 +439,9 @@ async fn db_connect_read_pool(
412439
}
413440

414441
/// Connect the writer pool.
442+
///
443+
/// It intentionally only contains a single connection
444+
/// as SQLite only allows a single write at a time
415445
async fn db_connect_write_pool(database_filename: &str) -> WriterPool {
416446
SqlitePoolOptions::new()
417447
.min_connections(1)

0 commit comments

Comments
 (0)