Skip to content

Commit a149af9

Browse files
committed
sub(feat): allow user to handle stream lag
- Previously we were just logging a warning when the number of changes in a tx exceeded the channel capacity. Now we report the lag in the change event to make it more obvious to the user and allow for the user to respond to this "error" condition.
1 parent d4069f3 commit a149af9

6 files changed

Lines changed: 226 additions & 41 deletions

File tree

crates/sqlx-sqlite-observer/README.md

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ This ensures subscribers **only receive notifications for committed changes**.
9595
### Core Types
9696

9797
* **`TableChange`**: Notification of a change to a database table
98+
* **`TableChangeEvent`**: Event yielded by `TableChangeStream`
99+
either `Change(TableChange)` or `Lagged(u64)`
98100
* **`ChangeOperation`**: Insert, Update, or Delete
99101
* **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob)
100102
* **`ObserverConfig`**: Configuration for table filtering and channel
@@ -111,7 +113,7 @@ This ensures subscribers **only receive notifications for committed changes**.
111113
* **`TableChangeStreamExt`**: Extension trait for converting receivers to
112114
streams
113115

114-
### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) <!-- COMING SOON -->
116+
### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`)
115117

116118
* **`ObservableSqliteDatabase`**: Wrapper for `SqliteDatabase` with observation
117119
* **`ObservableWriteGuard`**: Write guard with hooks registered
@@ -179,7 +181,7 @@ meaningful/correct for non-integer or composite primary keys.
179181

180182
### Basic Usage
181183

182-
```rust,no_run
184+
```rust
183185
use sqlx::SqlitePool;
184186
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
185187

@@ -219,10 +221,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
219221

220222
### Stream API
221223

222-
```rust,no_run
224+
```rust
223225
use futures::StreamExt;
224226
use sqlx::SqlitePool;
225-
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
227+
use sqlx_sqlite_observer::{
228+
SqliteObserver, ObserverConfig, TableChangeEvent,
229+
};
226230

227231
#[tokio::main]
228232
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -232,13 +236,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
232236

233237
let mut stream = observer.subscribe_stream(["users"]);
234238

235-
while let Some(change) = stream.next().await {
236-
println!(
237-
"Table {} row {} was {:?}",
238-
change.table,
239-
change.rowid.unwrap_or(-1),
240-
change.operation
241-
);
239+
while let Some(event) = stream.next().await {
240+
match event {
241+
TableChangeEvent::Change(change) => {
242+
println!(
243+
"Table {} row {} was {:?}",
244+
change.table,
245+
change.rowid.unwrap_or(-1),
246+
change.operation
247+
);
248+
}
249+
TableChangeEvent::Lagged(n) => {
250+
eprintln!("Missed {} notifications", n);
251+
}
252+
}
242253
}
243254

244255
Ok(())
@@ -247,7 +258,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
247258

248259
### Value Capture
249260

250-
```rust,no_run
261+
```rust
251262
use sqlx::SqlitePool;
252263
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
253264

@@ -284,7 +295,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
284295

285296
### SQLx SQLite Connection Manager Integration
286297

287-
<!-- TODO: Add example showing ObservableSqliteDatabase usage -->
298+
```rust
299+
use std::sync::Arc;
300+
use sqlx_sqlite_conn_mgr::SqliteDatabase;
301+
use sqlx_sqlite_observer::{
302+
ObservableSqliteDatabase, ObserverConfig,
303+
};
304+
305+
#[tokio::main]
306+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
307+
let db = SqliteDatabase::connect("mydb.db", None).await?;
308+
let config = ObserverConfig::new().with_tables(["users"]);
309+
let observable = ObservableSqliteDatabase::new(db, config);
310+
311+
let mut rx = observable.subscribe(["users"]);
312+
313+
// Write through the observable writer
314+
let mut writer = observable.acquire_writer().await?;
315+
sqlx::query("BEGIN").execute(&mut *writer).await?;
316+
sqlx::query("INSERT INTO users (name) VALUES (?)")
317+
.bind("Alice")
318+
.execute(&mut *writer)
319+
.await?;
320+
sqlx::query("COMMIT").execute(&mut *writer).await?;
321+
322+
// Notification arrives after commit
323+
let change = rx.recv().await?;
324+
println!("Changed: {}", change.table);
325+
326+
Ok(())
327+
}
328+
```
288329

289330
## Usage Notes
290331

@@ -301,6 +342,58 @@ let config = ObserverConfig::new()
301342
.with_channel_capacity(1000); // Handle large transactions
302343
```
303344

345+
### Handling Lag
346+
347+
When using the Stream API, the stream yields `TableChangeEvent` values.
348+
Most events are `Change` variants, but if a consumer falls behind, the
349+
stream yields a `Lagged(n)` event indicating how many notifications
350+
were missed.
351+
352+
```rust
353+
use futures::StreamExt;
354+
use sqlx_sqlite_observer::TableChangeEvent;
355+
# use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
356+
# async fn example(observer: SqliteObserver) {
357+
358+
let mut stream = observer.subscribe_stream(["users"]);
359+
360+
while let Some(event) = stream.next().await {
361+
match event {
362+
TableChangeEvent::Change(change) => {
363+
// Process the change normally
364+
}
365+
TableChangeEvent::Lagged(n) => {
366+
// n notifications were missed — local state may be stale.
367+
// Re-query the database for current state.
368+
tracing::warn!("Missed {} change notifications", n);
369+
}
370+
}
371+
}
372+
# }
373+
```
374+
375+
**When does lag happen?** The broadcast channel has a fixed capacity
376+
(default 256). Lag occurs when the oldest unread messages are
377+
overwritten. This can happen in two ways:
378+
379+
* A subscriber processes changes slower than they arrive
380+
* A single transaction contains more mutating statements than the
381+
channel capacity, causing messages to be overwritten before the
382+
consumer reads them
383+
384+
This is rare under normal conditions but can occur during bulk
385+
writes or large transactions.
386+
387+
**How to prevent it:**
388+
389+
* Increase `channel_capacity` via `ObserverConfig::with_channel_capacity`
390+
* Process changes faster (avoid blocking in the stream consumer)
391+
* Use a dedicated task for stream consumption
392+
393+
**Note:** The `broadcast::Receiver` API (from `subscribe()`) surfaces
394+
lag as `RecvError::Lagged(n)` — the same information, just through
395+
the raw tokio broadcast channel interface rather than the stream.
396+
304397
### Disabling Value Capture
305398

306399
By default, `TableChange` includes `old_values` and `new_values` with the actual

crates/sqlx-sqlite-observer/src/change.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,30 @@ impl ColumnValue {
100100
}
101101
}
102102

103+
/// Event yielded by [`TableChangeStream`](crate::stream::TableChangeStream).
104+
///
105+
/// Most events are `Change` variants containing the actual table change data.
106+
/// A `Lagged` event indicates the consumer fell behind and missed some
107+
/// notifications — consider increasing
108+
/// [`channel_capacity`](crate::config::ObserverConfig::channel_capacity).
109+
#[derive(Debug, Clone)]
110+
pub enum TableChangeEvent {
111+
/// A table change notification.
112+
Change(TableChange),
113+
/// The stream fell behind and missed `n` change notifications.
114+
///
115+
/// This can happen when:
116+
/// - The consumer is processing changes too slowly relative to the
117+
/// rate of database writes.
118+
/// - A single transaction contains more mutating statements than the
119+
/// channel capacity, causing older messages to be overwritten before
120+
/// the consumer reads them.
121+
///
122+
/// When this happens, the consumer should assume its local state may
123+
/// be stale and re-query the database for the current state.
124+
Lagged(u64),
125+
}
126+
103127
/// Notification of a change to a database table.
104128
///
105129
/// Contains the table name, operation type, affected rowid, and the

crates/sqlx-sqlite-observer/src/lib.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@
8181
//! ```rust,no_run
8282
//! use futures::StreamExt;
8383
//! use sqlx::SqlitePool;
84-
//! use sqlx_sqlite_observer::{ChangeOperation, SqliteObserver, ObserverConfig, TableChangeStreamExt};
84+
//! use sqlx_sqlite_observer::{
85+
//! SqliteObserver, ObserverConfig, TableChangeEvent, TableChangeStreamExt,
86+
//! };
8587
//!
8688
//! #[tokio::main]
8789
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -92,20 +94,26 @@
9294
//! // Get a Stream instead of broadcast::Receiver
9395
//! let mut stream = observer.subscribe_stream(["users"]);
9496
//!
95-
//! // Use standard Stream combinators
96-
//! while let Some(change) = stream.next().await {
97-
//! println!(
98-
//! "Table {} row {} was {:?}",
99-
//! change.table,
100-
//! change.rowid.unwrap_or(-1),
101-
//! change.operation
102-
//! );
103-
//! // Access typed column values
104-
//! if let Some(old) = &change.old_values {
105-
//! println!(" Old values: {:?}", old);
106-
//! }
107-
//! if let Some(new) = &change.new_values {
108-
//! println!(" New values: {:?}", new);
97+
//! // Stream yields TableChangeEvent variants
98+
//! while let Some(event) = stream.next().await {
99+
//! match event {
100+
//! TableChangeEvent::Change(change) => {
101+
//! println!(
102+
//! "Table {} row {} was {:?}",
103+
//! change.table,
104+
//! change.rowid.unwrap_or(-1),
105+
//! change.operation
106+
//! );
107+
//! if let Some(old) = &change.old_values {
108+
//! println!(" Old values: {:?}", old);
109+
//! }
110+
//! if let Some(new) = &change.new_values {
111+
//! println!(" New values: {:?}", new);
112+
//! }
113+
//! }
114+
//! TableChangeEvent::Lagged(n) => {
115+
//! eprintln!("Missed {} notifications, re-query state", n);
116+
//! }
109117
//! }
110118
//! }
111119
//!
@@ -127,7 +135,7 @@ pub mod stream;
127135
pub mod conn_mgr;
128136

129137
pub use broker::ObservationBroker;
130-
pub use change::{ChangeOperation, ColumnValue, TableChange, TableInfo};
138+
pub use change::{ChangeOperation, ColumnValue, TableChange, TableChangeEvent, TableInfo};
131139
pub use config::ObserverConfig;
132140
pub use connection::ObservableConnection;
133141
pub use error::Error;

crates/sqlx-sqlite-observer/src/stream.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio_stream::Stream;
66
use tokio_stream::wrappers::BroadcastStream;
77
use tracing::warn;
88

9-
use crate::change::TableChange;
9+
use crate::change::{TableChange, TableChangeEvent};
1010

1111
/// A filtered stream of table change notifications.
1212
///
@@ -32,7 +32,7 @@ impl TableChangeStream {
3232
}
3333

3434
impl Stream for TableChangeStream {
35-
type Item = TableChange;
35+
type Item = TableChangeEvent;
3636

3737
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3838
loop {
@@ -46,15 +46,17 @@ impl Stream for TableChangeStream {
4646
{
4747
continue;
4848
}
49-
return Poll::Ready(Some(change));
49+
return Poll::Ready(Some(TableChangeEvent::Change(change)));
5050
}
51-
Poll::Ready(Some(Err(err))) => {
52-
// Lagged error - missed some messages due to slow consumption
51+
Poll::Ready(Some(Err(
52+
tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(count),
53+
))) => {
5354
warn!(
54-
error = %err,
55-
"Stream lagged — missed change notifications. Consider increasing channel_capacity."
55+
missed = count,
56+
"Stream lagged — missed change notifications. \
57+
Consider increasing channel_capacity."
5658
);
57-
continue;
59+
return Poll::Ready(Some(TableChangeEvent::Lagged(count)));
5860
}
5961
Poll::Ready(None) => return Poll::Ready(None),
6062
Poll::Pending => return Poll::Pending,

crates/sqlx-sqlite-observer/tests/conn_mgr_tests.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,13 @@ async fn test_stream_receives_notifications() {
331331
let result = timeout(Duration::from_millis(100), stream.next()).await;
332332
assert!(result.is_ok(), "Stream receives notification");
333333

334-
let change = result.unwrap().unwrap();
335-
assert_eq!(change.table, "users");
334+
let event = result.unwrap().unwrap();
335+
match event {
336+
sqlx_sqlite_observer::TableChangeEvent::Change(change) => {
337+
assert_eq!(change.table, "users");
338+
}
339+
sqlx_sqlite_observer::TableChangeEvent::Lagged(_) => {
340+
panic!("Expected Change event, got Lagged");
341+
}
342+
}
336343
}

crates/sqlx-sqlite-observer/tests/integration_tests.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,8 +391,15 @@ async fn test_stream_receives_notifications() {
391391
let result = timeout(Duration::from_millis(100), stream.next()).await;
392392
assert!(result.is_ok(), "Stream receives notification");
393393

394-
let change = result.unwrap().unwrap();
395-
assert_eq!(change.table, "users");
394+
let event = result.unwrap().unwrap();
395+
match event {
396+
sqlx_sqlite_observer::TableChangeEvent::Change(change) => {
397+
assert_eq!(change.table, "users");
398+
}
399+
sqlx_sqlite_observer::TableChangeEvent::Lagged(_) => {
400+
panic!("Expected Change event, got Lagged");
401+
}
402+
}
396403
}
397404

398405
#[tokio::test]
@@ -424,6 +431,50 @@ async fn test_stream_filters_tables() {
424431
assert!(result.is_err(), "Stream filters out non-subscribed tables");
425432
}
426433

434+
#[tokio::test]
435+
async fn test_stream_lag_when_capacity_exceeded() {
436+
let pool = setup_test_db().await;
437+
let config = ObserverConfig::new()
438+
.with_tables(["users"])
439+
.with_channel_capacity(2);
440+
441+
let observer = SqliteObserver::new(pool, config);
442+
443+
let mut stream = observer.subscribe_stream(["users"]);
444+
let mut conn = observer.acquire().await.unwrap();
445+
446+
// Insert more rows than the channel capacity in a single transaction
447+
sqlx::query("BEGIN").execute(&mut **conn).await.unwrap();
448+
for i in 0..5 {
449+
sqlx::query("INSERT INTO users (name) VALUES (?)")
450+
.bind(format!("User{}", i))
451+
.execute(&mut **conn)
452+
.await
453+
.unwrap();
454+
}
455+
sqlx::query("COMMIT").execute(&mut **conn).await.unwrap();
456+
457+
let mut saw_lagged = false;
458+
let mut saw_change = false;
459+
460+
// Drain all available events
461+
while let Ok(Some(event)) = timeout(Duration::from_millis(100), stream.next()).await {
462+
match event {
463+
sqlx_sqlite_observer::TableChangeEvent::Lagged(n) => {
464+
assert!(n > 0, "Lagged count should be > 0");
465+
saw_lagged = true;
466+
}
467+
sqlx_sqlite_observer::TableChangeEvent::Change(change) => {
468+
assert_eq!(change.table, "users");
469+
saw_change = true;
470+
}
471+
}
472+
}
473+
474+
assert!(saw_lagged, "Expected at least one Lagged event");
475+
assert!(saw_change, "Expected at least one Change event");
476+
}
477+
427478
// ============================================================================
428479
// Value Capture
429480
// ============================================================================

0 commit comments

Comments
 (0)