diff --git a/Cargo.lock b/Cargo.lock index 115d479..5eebbde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.34.3" +version = "0.34.5" dependencies = [ "anyhow", "arc-swap", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.34.3" +version = "0.34.5" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 76e111d..c58a1b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.34.4" +version = "0.34.5" [workspace.lints.clippy] diff --git a/opsqueue/src/consumer/server/mod.rs b/opsqueue/src/consumer/server/mod.rs index 412391e..1837109 100644 --- a/opsqueue/src/consumer/server/mod.rs +++ b/opsqueue/src/consumer/server/mod.rs @@ -198,8 +198,8 @@ impl Completer { () = cancellation_token.cancelled() => break, Some(msg) = self.mailbox.recv() => self.handle_message(msg).await, } - // Log some indication of progress every so often: self.count = self.count.saturating_add(1); + // Log some indication of progress every so often: if self.count.is_multiple_of(1000) { tracing::info!("Processed {} chunks", self.count); } @@ -237,6 +237,13 @@ impl Completer { } histogram!(crate::prometheus::CONSUMER_COMPLETE_CHUNK_DURATION) .record(start.elapsed()); + + // And while we have the connection already, + // let's make an extra WAL checkpoint every so often + if self.count.is_multiple_of(100) { + let _ = db::perform_explicit_wal_checkpoint(conn).await; + } + db_res?; Ok(()) } diff --git a/opsqueue/src/db/mod.rs b/opsqueue/src/db/mod.rs index c872bc8..034ceeb 100644 --- a/opsqueue/src/db/mod.rs +++ b/opsqueue/src/db/mod.rs @@ -67,6 +67,7 @@ use sqlx::{ use conn::{Conn, NoTransaction, Reader, Tx, Writer}; pub use magic::{False, True}; +use tokio::time::MissedTickBehavior; /// A [`Pool`] that can produce [`Writer`]s. pub type WriterPool = Pool; @@ -242,24 +243,12 @@ impl DBPools { self.write_pool.writer_conn().await } - /// Performas an explicit, non-passive WAL checkpoint - /// We use the 'RESTART' strategy, which will do the most work but will briefly block the writer *and* all readers - /// - /// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint - pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> { - let mut conn = self.writer_conn().await?; - let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);") - .fetch_one(conn.get_inner()) - .await?; - tracing::debug!("WAL checkpoint completed {res:?}"); - Ok(()) - } - pub async fn periodically_checkpoint_wal(&self) { const EXPLICIT_WAL_CHECK_INTERVAL: Duration = Duration::from_millis(100); let mut interval = tokio::time::interval(EXPLICIT_WAL_CHECK_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { - let _ = self.perform_explicit_wal_checkpoint().await; + let _ = self.write_pool.perform_explicit_wal_checkpoint().await; interval.tick().await; } } @@ -302,6 +291,23 @@ impl WriterPool { pub async fn writer_conn(&self) -> sqlx::Result> { self.acquire().await } + + pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> { + let conn = self.writer_conn().await?; + perform_explicit_wal_checkpoint(conn).await + } +} + +/// Performs an explicit, non-passive WAL checkpoint +/// We use the 'RESTART' strategy, which will do the most work but will briefly block the writer *and* all readers +/// +/// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint +pub async fn perform_explicit_wal_checkpoint(mut conn: impl WriterConnection) -> sqlx::Result<()> { + let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);") + .fetch_one(conn.get_inner()) + .await?; + tracing::debug!("WAL checkpoint completed {res:?}"); + Ok(()) } impl ReaderPool { diff --git a/opsqueue/src/producer/server.rs b/opsqueue/src/producer/server.rs index d32fee1..f6ef8b0 100644 --- a/opsqueue/src/producer/server.rs +++ b/opsqueue/src/producer/server.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::common::submission::{self, SubmissionId}; -use crate::db::DBPools; +use crate::db::{self, DBPools}; use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -126,6 +126,11 @@ async fn insert_submission( ) .await?; + // As we already have access to the write checkpoint + // and submission inserts result in large DB mutations from time to time + // this is the moment to perform an extra WAL checkpoint + let _ = db::perform_explicit_wal_checkpoint(conn).await; + // We've done a new insert! Let's tell any waiting consumers! state.notify_on_insert.notify_waiters();