Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ members = [
]

[workspace.package]
version = "0.34.4"
version = "0.34.5"


[workspace.lints.clippy]
Expand Down
9 changes: 8 additions & 1 deletion opsqueue/src/consumer/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ impl Completer {
() = cancellation_token.cancelled() => break,
Some(msg) = self.mailbox.recv() => self.handle_message(msg).await,
}
// Log some indication of progress every so often:
self.count = self.count.saturating_add(1);
// Log some indication of progress every so often:
if self.count.is_multiple_of(1000) {
tracing::info!("Processed {} chunks", self.count);
}
Expand Down Expand Up @@ -237,6 +237,13 @@ impl Completer {
}
histogram!(crate::prometheus::CONSUMER_COMPLETE_CHUNK_DURATION)
.record(start.elapsed());

// And while we have the connection already,
// let's make an extra WAL checkpoint every so often
if self.count.is_multiple_of(100) {
let _ = db::perform_explicit_wal_checkpoint(conn).await;
}

db_res?;
Ok(())
}
Expand Down
34 changes: 20 additions & 14 deletions opsqueue/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use sqlx::{
use conn::{Conn, NoTransaction, Reader, Tx, Writer};

pub use magic::{False, True};
use tokio::time::MissedTickBehavior;

/// A [`Pool`] that can produce [`Writer`]s.
pub type WriterPool = Pool<True>;
Expand Down Expand Up @@ -242,24 +243,12 @@ impl DBPools {
self.write_pool.writer_conn().await
}

/// Performas an explicit, non-passive WAL checkpoint
/// We use the 'RESTART' strategy, which will do the most work but will briefly block the writer *and* all readers
///
/// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> {
let mut conn = self.writer_conn().await?;
let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);")
.fetch_one(conn.get_inner())
.await?;
tracing::debug!("WAL checkpoint completed {res:?}");
Ok(())
}

pub async fn periodically_checkpoint_wal(&self) {
const EXPLICIT_WAL_CHECK_INTERVAL: Duration = Duration::from_millis(100);
let mut interval = tokio::time::interval(EXPLICIT_WAL_CHECK_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
let _ = self.perform_explicit_wal_checkpoint().await;
let _ = self.write_pool.perform_explicit_wal_checkpoint().await;
interval.tick().await;
}
}
Expand Down Expand Up @@ -302,6 +291,23 @@ impl WriterPool {
pub async fn writer_conn(&self) -> sqlx::Result<Writer<NoTransaction>> {
self.acquire().await
}

pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> {
let conn = self.writer_conn().await?;
perform_explicit_wal_checkpoint(conn).await
}
}

/// Performs an explicit, non-passive WAL checkpoint
/// We use the 'RESTART' strategy, which will do the most work but will briefly block the writer *and* all readers
///
/// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
pub async fn perform_explicit_wal_checkpoint(mut conn: impl WriterConnection) -> sqlx::Result<()> {
let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);")
.fetch_one(conn.get_inner())
.await?;
tracing::debug!("WAL checkpoint completed {res:?}");
Ok(())
}

impl ReaderPool {
Expand Down
7 changes: 6 additions & 1 deletion opsqueue/src/producer/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use crate::common::submission::{self, SubmissionId};
use crate::db::DBPools;
use crate::db::{self, DBPools};
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
Expand Down Expand Up @@ -126,6 +126,11 @@ async fn insert_submission(
)
.await?;

// As we already have access to the write checkpoint
// and submission inserts result in large DB mutations from time to time
// this is the moment to perform an extra WAL checkpoint
let _ = db::perform_explicit_wal_checkpoint(conn).await;

// We've done a new insert! Let's tell any waiting consumers!
state.notify_on_insert.notify_waiters();

Expand Down
Loading