Skip to content

Commit 6679954

Browse files
authored
Merge pull request #33 from pmorris-dev/sqlite-observer-phase-2
sub(feat): implement the base API and logic
2 parents 9bcb0c7 + 4623ec4 commit 6679954

11 files changed

Lines changed: 1570 additions & 15 deletions

File tree

Cargo.lock

Lines changed: 80 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sqlx-sqlite-observer/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ bundled = ["libsqlite3-sys/bundled"]
2020

2121
[dependencies]
2222
tokio = { version = "1.49.0", features = ["sync"] }
23+
tokio-stream = { version = "0.1", features = ["sync"] }
2324
thiserror = "2.0.17"
2425
tracing = { version = "0.1.44", default-features = false, features = ["std", "release_max_level_off"] }
2526
parking_lot = "0.12.3"
@@ -30,3 +31,6 @@ libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] }
3031

3132
[dev-dependencies]
3233
tokio = { version = "1.49.0", features = ["full", "macros"] }
34+
futures = "0.3.31"
35+
tempfile = "3.24.0"
36+
tracing-subscriber = "0.3.22"

crates/sqlx-sqlite-observer/README.md

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,14 @@ This ensures subscribers **only receive notifications for committed changes**.
9898
* **`ChangeOperation`**: Insert, Update, or Delete
9999
* **`ColumnValue`**: Typed column value (Null, Integer, Real, Text, Blob)
100100
* **`ObserverConfig`**: Configuration for table filtering and channel
101-
capacity <!-- COMING SOON -->
101+
capacity
102102

103-
### Observer Types <!-- COMING SOON -->
103+
### Observer Types
104104

105105
* **`SqliteObserver`**: Main observer for `SqlitePool` connections
106106
* **`ObservableConnection`**: Connection wrapper with hooks registered
107107

108-
### Stream Types <!-- COMING SOON -->
108+
### Stream Types
109109

110110
* **`TableChangeStream`**: Async stream of table changes
111111
* **`TableChangeStreamExt`**: Extension trait for converting receivers to
@@ -177,19 +177,110 @@ meaningful/correct for non-integer or composite primary keys.
177177

178178
## Examples
179179

180-
> **Coming in Phase 2** - Full working examples will be added in a subsequent PR.
181-
182180
### Basic Usage
183181

184-
<!-- TODO: Add basic example showing SqliteObserver usage -->
182+
```rust,no_run
183+
use sqlx::SqlitePool;
184+
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
185+
186+
#[tokio::main]
187+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
188+
let pool = SqlitePool::connect("sqlite:mydb.db").await?;
189+
let observer = SqliteObserver::new(pool, ObserverConfig::default());
190+
191+
// Subscribe to changes on specific tables
192+
let mut rx = observer.subscribe(["users"]);
193+
194+
// Spawn a task to handle notifications
195+
tokio::spawn(async move {
196+
while let Ok(change) = rx.recv().await {
197+
println!(
198+
"Table {} row {} was {:?}",
199+
change.table,
200+
change.rowid.unwrap_or(-1),
201+
change.operation
202+
);
203+
if let Some(ColumnValue::Integer(id)) = change.primary_key.first() {
204+
println!(" PK: {}", id);
205+
}
206+
}
207+
});
208+
209+
// Use the observer to execute queries
210+
let mut conn = observer.acquire().await?;
211+
sqlx::query("INSERT INTO users (name) VALUES (?)")
212+
.bind("Alice")
213+
.execute(&mut **conn)
214+
.await?;
215+
216+
Ok(())
217+
}
218+
```
185219

186220
### Stream API
187221

188-
<!-- TODO: Add stream example showing TableChangeStream usage -->
222+
```rust,no_run
223+
use futures::StreamExt;
224+
use sqlx::SqlitePool;
225+
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig};
226+
227+
#[tokio::main]
228+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
229+
let pool = SqlitePool::connect("sqlite:mydb.db").await?;
230+
let config = ObserverConfig::new().with_tables(["users", "posts"]);
231+
let observer = SqliteObserver::new(pool, config);
232+
233+
let mut stream = observer.subscribe_stream(["users"]);
234+
235+
while let Some(change) = stream.next().await {
236+
println!(
237+
"Table {} row {} was {:?}",
238+
change.table,
239+
change.rowid.unwrap_or(-1),
240+
change.operation
241+
);
242+
}
243+
244+
Ok(())
245+
}
246+
```
189247

190248
### Value Capture
191249

192-
<!-- TODO: Add example showing old/new column value access -->
250+
```rust,no_run
251+
use sqlx::SqlitePool;
252+
use sqlx_sqlite_observer::{SqliteObserver, ObserverConfig, ColumnValue};
253+
254+
#[tokio::main]
255+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
256+
let pool = SqlitePool::connect("sqlite:mydb.db").await?;
257+
let config = ObserverConfig::new().with_tables(["users"]);
258+
let observer = SqliteObserver::new(pool, config);
259+
260+
let mut rx = observer.subscribe(["users"]);
261+
let change = rx.recv().await?;
262+
263+
// Access old/new column values
264+
if let Some(old) = &change.old_values {
265+
println!("Old values: {:?}", old);
266+
}
267+
if let Some(new) = &change.new_values {
268+
println!("New values: {:?}", new);
269+
}
270+
271+
// Disable value capture for lower memory usage
272+
let config = ObserverConfig::new()
273+
.with_tables(["users"])
274+
.with_capture_values(false);
275+
let observer = SqliteObserver::new(
276+
SqlitePool::connect("sqlite:mydb.db").await?,
277+
config,
278+
);
279+
// old_values and new_values will be None
280+
281+
Ok(())
282+
}
283+
```
193284

194285
### SQLx SQLite Connection Manager Integration
195286

@@ -204,8 +295,6 @@ buffered. All changes in a transaction are delivered at once on commit. If your
204295
transaction contains more mutating statements than this capacity, **messages
205296
will be dropped**.
206297

207-
<!-- COMING SOON -->
208-
209298
```rust
210299
let config = ObserverConfig::new()
211300
.with_tables(["users", "posts"])
@@ -217,8 +306,6 @@ let config = ObserverConfig::new()
217306
By default, `TableChange` includes `old_values` and `new_values` with the actual
218307
column data. Disable this for lower memory usage if you only need row IDs:
219308

220-
<!-- COMING SOON -->
221-
222309
```rust
223310
let config = ObserverConfig::new()
224311
.with_tables(["users"])

crates/sqlx-sqlite-observer/src/broker.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,40 @@ impl ObservationBroker {
9191
self.table_info.write().insert(table.to_string(), info);
9292
}
9393

94+
/// Registers multiple tables for observation without schema info.
95+
///
96+
/// This is a two-phase registration: tables are marked for observation immediately,
97+
/// but primary key extraction will return empty `Vec` until [`set_table_info`] is
98+
/// called for each table. This is useful when you want to register tables before
99+
/// their schema is known (e.g., before the first connection is acquired).
100+
///
101+
/// **Prefer [`observe_table`] when schema info is available**, as it atomically
102+
/// registers the table and sets schema info in one call.
103+
///
104+
/// [`set_table_info`]: Self::set_table_info
105+
/// [`observe_table`]: Self::observe_table
106+
pub fn observe_tables<I, S>(&self, tables: I)
107+
where
108+
I: IntoIterator<Item = S>,
109+
S: AsRef<str>,
110+
{
111+
let mut observed = self.observed_tables.write();
112+
for table in tables {
113+
let table_name = table.as_ref().to_string();
114+
trace!(table = %table_name, "Observing table");
115+
observed.insert(table_name);
116+
}
117+
}
118+
119+
/// Sets the schema information for an observed table.
120+
///
121+
/// This information is used to extract primary key values and determine
122+
/// whether the rowid is meaningful for the table.
123+
pub fn set_table_info(&self, table: &str, info: TableInfo) {
124+
trace!(table = %table, pk_columns = ?info.pk_columns, without_rowid = info.without_rowid, "Setting table info");
125+
self.table_info.write().insert(table.to_string(), info);
126+
}
127+
94128
/// Gets the schema information for an observed table.
95129
pub fn get_table_info(&self, table: &str) -> Option<TableInfo> {
96130
self.table_info.read().get(table).cloned()

0 commit comments

Comments
 (0)