Skip to content

Commit 2fb37b9

Browse files
authored
Merge pull request #34 from pmorris-dev/sqlx-sqlite-observer-final-phase
Sqlx sqlite observer final phase
2 parents 6679954 + a149af9 commit 2fb37b9

10 files changed

Lines changed: 923 additions & 40 deletions

File tree

Cargo.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sqlx-sqlite-observer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ keywords = ["sqlite", "sqlx", "reactive", "observer", "database"]
1313
categories = ["database", "asynchronous"]
1414

1515
[features]
16+
conn-mgr = ["dep:sqlx-sqlite-conn-mgr"]
1617
# Bundle SQLite by default - preupdate hooks require SQLITE_ENABLE_PREUPDATE_HOOK
1718
# which most system SQLite libraries don't have enabled.
1819
default = ["bundled"]
@@ -28,6 +29,7 @@ regex = "1.12.3"
2829
sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio"], default-features = false }
2930
# Required for preupdate_hook - SQLite must be compiled with SQLITE_ENABLE_PREUPDATE_HOOK
3031
libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] }
32+
sqlx-sqlite-conn-mgr = { version = "0.8.6", optional = true }
3133

3234
[dev-dependencies]
3335
tokio = { version = "1.49.0", features = ["full", "macros"] }

crates/sqlx-sqlite-observer/README.md

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ This ensures subscribers **only receive notifications for committed changes**.
9595
### Core Types
9696

9797
* **`TableChange`**: Notification of a change to a database table
98+
* **`TableChangeEvent`**: Event yielded by `TableChangeStream`
99+
either `Change(TableChange)` or `Lagged(u64)`
98100
* **`ChangeOperation`**: Insert, Update, or Delete
99101
* **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob)
100102
* **`ObserverConfig`**: Configuration for table filtering and channel
@@ -111,7 +113,7 @@ This ensures subscribers **only receive notifications for committed changes**.
111113
* **`TableChangeStreamExt`**: Extension trait for converting receivers to
112114
streams
113115

114-
### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`) <!-- COMING SOON -->
116+
### SQLx SQLite Connection Manager Integration (feature: `conn-mgr`)
115117

116118
* **`ObservableSqliteDatabase`**: Wrapper for `SqliteDatabase` with observation
117119
* **`ObservableWriteGuard`**: Write guard with hooks registered
@@ -179,7 +181,7 @@ meaningful/correct for non-integer or composite primary keys.
179181

180182
### Basic Usage
181183

182-
```rust,no_run
184+
```rust
183185
use sqlx::SqlitePool;
184186
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
185187

@@ -219,10 +221,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
219221

220222
### Stream API
221223

222-
```rust,no_run
224+
```rust
223225
use futures::StreamExt;
224226
use sqlx::SqlitePool;
225-
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
227+
use sqlx_sqlite_observer::{
228+
SqliteObserver, ObserverConfig, TableChangeEvent,
229+
};
226230

227231
#[tokio::main]
228232
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -232,13 +236,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
232236

233237
let mut stream = observer.subscribe_stream(["users"]);
234238

235-
while let Some(change) = stream.next().await {
236-
println!(
237-
"Table {} row {} was {:?}",
238-
change.table,
239-
change.rowid.unwrap_or(-1),
240-
change.operation
241-
);
239+
while let Some(event) = stream.next().await {
240+
match event {
241+
TableChangeEvent::Change(change) => {
242+
println!(
243+
"Table {} row {} was {:?}",
244+
change.table,
245+
change.rowid.unwrap_or(-1),
246+
change.operation
247+
);
248+
}
249+
TableChangeEvent::Lagged(n) => {
250+
eprintln!("Missed {} notifications", n);
251+
}
252+
}
242253
}
243254

244255
Ok(())
@@ -247,7 +258,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
247258

248259
### Value Capture
249260

250-
```rust,no_run
261+
```rust
251262
use sqlx::SqlitePool;
252263
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
253264

@@ -284,7 +295,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
284295

285296
### SQLx SQLite Connection Manager Integration
286297

287-
<!-- TODO: Add example showing ObservableSqliteDatabase usage -->
298+
```rust
299+
use std::sync::Arc;
300+
use sqlx_sqlite_conn_mgr::SqliteDatabase;
301+
use sqlx_sqlite_observer::{
302+
ObservableSqliteDatabase, ObserverConfig,
303+
};
304+
305+
#[tokio::main]
306+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
307+
let db = SqliteDatabase::connect("mydb.db", None).await?;
308+
let config = ObserverConfig::new().with_tables(["users"]);
309+
let observable = ObservableSqliteDatabase::new(db, config);
310+
311+
let mut rx = observable.subscribe(["users"]);
312+
313+
// Write through the observable writer
314+
let mut writer = observable.acquire_writer().await?;
315+
sqlx::query("BEGIN").execute(&mut *writer).await?;
316+
sqlx::query("INSERT INTO users (name) VALUES (?)")
317+
.bind("Alice")
318+
.execute(&mut *writer)
319+
.await?;
320+
sqlx::query("COMMIT").execute(&mut *writer).await?;
321+
322+
// Notification arrives after commit
323+
let change = rx.recv().await?;
324+
println!("Changed: {}", change.table);
325+
326+
Ok(())
327+
}
328+
```
288329

289330
## Usage Notes
290331

@@ -301,6 +342,58 @@ let config = ObserverConfig::new()
301342
.with_channel_capacity(1000); // Handle large transactions
302343
```
303344

345+
### Handling Lag
346+
347+
When using the Stream API, the stream yields `TableChangeEvent` values.
348+
Most events are `Change` variants, but if a consumer falls behind, the
349+
stream yields a `Lagged(n)` event indicating how many notifications
350+
were missed.
351+
352+
```rust
353+
use futures::StreamExt;
354+
use sqlx_sqlite_observer::TableChangeEvent;
355+
# use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
356+
# async fn example(observer: SqliteObserver) {
357+
358+
let mut stream = observer.subscribe_stream(["users"]);
359+
360+
while let Some(event) = stream.next().await {
361+
match event {
362+
TableChangeEvent::Change(change) => {
363+
// Process the change normally
364+
}
365+
TableChangeEvent::Lagged(n) => {
366+
// n notifications were missed — local state may be stale.
367+
// Re-query the database for current state.
368+
tracing::warn!("Missed {} change notifications", n);
369+
}
370+
}
371+
}
372+
# }
373+
```
374+
375+
**When does lag happen?** The broadcast channel has a fixed capacity
376+
(default 256). Lag occurs when the oldest unread messages are
377+
overwritten. This can happen in two ways:
378+
379+
* A subscriber processes changes slower than they arrive
380+
* A single transaction contains more mutating statements than the
381+
channel capacity, causing messages to be overwritten before the
382+
consumer reads them
383+
384+
This is rare under normal conditions but can occur during bulk
385+
writes or large transactions.
386+
387+
**How to prevent it:**
388+
389+
* Increase `channel_capacity` via `ObserverConfig::with_channel_capacity`
390+
* Process changes faster (avoid blocking in the stream consumer)
391+
* Use a dedicated task for stream consumption
392+
393+
**Note:** The `broadcast::Receiver` API (from `subscribe()`) surfaces
394+
lag as `RecvError::Lagged(n)` — the same information, just through
395+
the raw tokio broadcast channel interface rather than the stream.
396+
304397
### Disabling Value Capture
305398

306399
By default, `TableChange` includes `old_values` and `new_values` with the actual

crates/sqlx-sqlite-observer/src/change.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,30 @@ impl ColumnValue {
100100
}
101101
}
102102

103+
/// Event yielded by [`TableChangeStream`](crate::stream::TableChangeStream).
104+
///
105+
/// Most events are `Change` variants containing the actual table change data.
106+
/// A `Lagged` event indicates the consumer fell behind and missed some
107+
/// notifications — consider increasing
108+
/// [`channel_capacity`](crate::config::ObserverConfig::channel_capacity).
109+
#[derive(Debug, Clone)]
110+
pub enum TableChangeEvent {
111+
/// A table change notification.
112+
Change(TableChange),
113+
/// The stream fell behind and missed `n` change notifications.
114+
///
115+
/// This can happen when:
116+
/// - The consumer is processing changes too slowly relative to the
117+
/// rate of database writes.
118+
/// - A single transaction contains more mutating statements than the
119+
/// channel capacity, causing older messages to be overwritten before
120+
/// the consumer reads them.
121+
///
122+
/// When this happens, the consumer should assume its local state may
123+
/// be stale and re-query the database for the current state.
124+
Lagged(u64),
125+
}
126+
103127
/// Notification of a change to a database table.
104128
///
105129
/// Contains the table name, operation type, affected rowid, and the

0 commit comments

Comments
 (0)