Skip to content

Commit d4069f3

Browse files
committed
feat: add integration with conn-mgr crate
- we wrapped SqliteDatabase and WriteGuard and, otherwise implemented seamless interop between observer and conn-mgr
1 parent 6679954 commit d4069f3

6 files changed

Lines changed: 699 additions & 1 deletion

File tree

Cargo.lock

Lines changed: 15 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sqlx-sqlite-observer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ keywords = ["sqlite", "sqlx", "reactive", "observer", "database"]
1313
categories = ["database", "asynchronous"]
1414

1515
[features]
16+
conn-mgr = ["dep:sqlx-sqlite-conn-mgr"]
1617
# Bundle SQLite by default - preupdate hooks require SQLITE_ENABLE_PREUPDATE_HOOK
1718
# which most system SQLite libraries don't have enabled.
1819
default = ["bundled"]
@@ -28,6 +29,7 @@ regex = "1.12.3"
2829
sqlx = { version = "0.8.6", features = ["sqlite", "runtime-tokio"], default-features = false }
2930
# Required for preupdate_hook - SQLite must be compiled with SQLITE_ENABLE_PREUPDATE_HOOK
3031
libsqlite3-sys = { version = "0.30.1", features = ["preupdate_hook"] }
32+
sqlx-sqlite-conn-mgr = { version = "0.8.6", optional = true }
3133

3234
[dev-dependencies]
3335
tokio = { version = "1.49.0", features = ["full", "macros"] }
Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
//! Integration with sqlx-sqlite-conn-mgr crate.
2+
//!
3+
//! This module provides observation capabilities for databases managed by
4+
//! `sqlx-sqlite-conn-mgr`. Enable with the `conn-mgr` feature.
5+
//!
6+
//! Uses SQLite's native hooks for transaction-safe change tracking. Changes
7+
//! are buffered during transactions and only published after commit.
8+
//!
9+
//! # Example
10+
//!
11+
//! ```no_run
12+
//! use std::sync::Arc;
13+
//! use sqlx_sqlite_conn_mgr::SqliteDatabase;
14+
//! use sqlx_sqlite_observer::{ObservableSqliteDatabase, ObserverConfig};
15+
//!
16+
//! #[tokio::main]
17+
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
18+
//! let db = SqliteDatabase::connect("mydb.db", None).await?;
19+
//! let config = ObserverConfig::new().with_tables(["users", "posts"]);
20+
//! let observable = ObservableSqliteDatabase::new(db, config);
21+
//!
22+
//! let mut rx = observable.subscribe(["users"]);
23+
//!
24+
//! // Use observable writer for tracked changes
25+
//! let mut writer = observable.acquire_writer().await?;
26+
//! sqlx::query("BEGIN").execute(&mut *writer).await?;
27+
//! sqlx::query("INSERT INTO users (name) VALUES (?)")
28+
//! .bind("Alice")
29+
//! .execute(&mut *writer)
30+
//! .await?;
31+
//!
32+
//! sqlx::query("COMMIT").execute(&mut *writer).await?;
33+
//! // Changes publish on commit!
34+
//!
35+
//! // Read pool works as normal (no observation needed for reads)
36+
//! let rows = sqlx::query("SELECT * FROM users")
37+
//! .fetch_all(observable.read_pool()?)
38+
//! .await?;
39+
//!
40+
//! Ok(())
41+
//! }
42+
//! ```
43+
44+
use std::ops::{Deref, DerefMut};
45+
use std::sync::Arc;
46+
47+
use libsqlite3_sys::sqlite3;
48+
use sqlx::sqlite::SqliteConnection;
49+
use sqlx::{Pool, Sqlite};
50+
use sqlx_sqlite_conn_mgr::{SqliteDatabase, WriteGuard};
51+
use tokio::sync::broadcast;
52+
use tracing::{debug, trace, warn};
53+
54+
use crate::Result;
55+
use crate::broker::ObservationBroker;
56+
use crate::change::TableChange;
57+
use crate::config::ObserverConfig;
58+
use crate::hooks;
59+
use crate::schema::query_table_info;
60+
use crate::stream::TableChangeStream;
61+
62+
/// Wrapper around `SqliteDatabase` that provides change observation.
63+
///
64+
/// This type integrates with `sqlx-sqlite-conn-mgr` to observe changes made
65+
/// through the write connection while leaving read operations unaffected.
66+
/// Uses SQLite's native hooks for transaction-safe notifications.
67+
pub struct ObservableSqliteDatabase {
68+
db: Arc<SqliteDatabase>,
69+
broker: Arc<ObservationBroker>,
70+
}
71+
72+
impl ObservableSqliteDatabase {
73+
/// Create a new observable database wrapper.
74+
///
75+
/// # Arguments
76+
///
77+
/// * `db` - The `SqliteDatabase` instance to observe
78+
/// * `config` - Observer configuration specifying which tables to track
79+
pub fn new(db: Arc<SqliteDatabase>, config: ObserverConfig) -> Self {
80+
let broker = ObservationBroker::new(config.channel_capacity, config.capture_values);
81+
82+
if !config.tables.is_empty() {
83+
broker.observe_tables(config.tables.iter().map(String::as_str));
84+
}
85+
86+
Self { db, broker }
87+
}
88+
89+
/// Subscribe to change notifications.
90+
///
91+
/// Returns a broadcast receiver that will receive `TableChange` events
92+
/// when observable tables are modified and transactions commit.
93+
pub fn subscribe<I, S>(&self, tables: I) -> broadcast::Receiver<TableChange>
94+
where
95+
I: IntoIterator<Item = S>,
96+
S: Into<String>,
97+
{
98+
let tables: Vec<String> = tables.into_iter().map(Into::into).collect();
99+
if !tables.is_empty() {
100+
self
101+
.broker
102+
.observe_tables(tables.iter().map(String::as_str));
103+
}
104+
self.broker.subscribe()
105+
}
106+
107+
/// Subscribe and get a `Stream` for easier async iteration.
108+
pub fn subscribe_stream<I, S>(&self, tables: I) -> TableChangeStream
109+
where
110+
I: IntoIterator<Item = S>,
111+
S: Into<String>,
112+
{
113+
use crate::stream::TableChangeStreamExt;
114+
let tables: Vec<String> = tables.into_iter().map(Into::into).collect();
115+
// Register tables for observation (uses references, avoids clone)
116+
if !tables.is_empty() {
117+
self
118+
.broker
119+
.observe_tables(tables.iter().map(String::as_str));
120+
}
121+
let rx = self.broker.subscribe();
122+
let stream = rx.into_stream();
123+
if tables.is_empty() {
124+
stream
125+
} else {
126+
stream.filter_tables(tables)
127+
}
128+
}
129+
130+
/// Get a reference to the read-only connection pool.
131+
///
132+
/// Read operations don't need observation since they don't modify data.
133+
/// However, this pool is also used internally to query table schema
134+
/// information (primary key columns, WITHOUT ROWID status) when tables
135+
/// are first observed.
136+
pub fn read_pool(&self) -> sqlx_sqlite_conn_mgr::Result<&Pool<Sqlite>> {
137+
self.db.read_pool()
138+
}
139+
140+
/// Acquire an observable write guard.
141+
///
142+
/// The returned `ObservableWriteGuard` has observation hooks registered.
143+
/// Changes are published to subscribers when transactions commit.
144+
///
145+
/// On first acquisition for each table, queries the schema to determine
146+
/// primary key columns and WITHOUT ROWID status.
147+
pub async fn acquire_writer(&self) -> Result<ObservableWriteGuard> {
148+
let writer = self
149+
.db
150+
.acquire_writer()
151+
.await
152+
.map_err(crate::error::Error::ConnMgr)?;
153+
154+
let mut observable = ObservableWriteGuard {
155+
writer: Some(writer),
156+
hooks_registered: false,
157+
raw_db: None,
158+
};
159+
160+
// Query table info for any observed tables that don't have it yet
161+
self.ensure_table_info().await?;
162+
163+
observable.register_hooks(Arc::clone(&self.broker)).await?;
164+
Ok(observable)
165+
}
166+
167+
/// Ensures TableInfo is set for all observed tables.
168+
///
169+
/// Uses the read pool to query schema information, respecting conn-mgr's
170+
/// requirement that all connections be acquired through it.
171+
async fn ensure_table_info(&self) -> Result<()> {
172+
let observed = self.broker.get_observed_tables();
173+
174+
// Collect tables that need schema info
175+
let tables_to_query: Vec<String> = observed
176+
.into_iter()
177+
.filter(|table| self.broker.get_table_info(table).is_none())
178+
.collect();
179+
180+
if tables_to_query.is_empty() {
181+
return Ok(());
182+
}
183+
184+
// Use read pool to query schema
185+
let pool = self.db.read_pool().map_err(crate::error::Error::ConnMgr)?;
186+
let mut conn = pool.acquire().await.map_err(crate::error::Error::Sqlx)?;
187+
188+
for table in tables_to_query {
189+
match query_table_info(&mut conn, &table).await {
190+
Ok(Some(info)) => {
191+
debug!(table = %table, pk_columns = ?info.pk_columns, without_rowid = info.without_rowid, "Queried table info");
192+
self.broker.set_table_info(&table, info);
193+
}
194+
Ok(None) => {
195+
warn!(table = %table, "Table not found in schema");
196+
}
197+
Err(e) => {
198+
warn!(table = %table, error = %e, "Failed to query table info");
199+
}
200+
}
201+
}
202+
203+
Ok(())
204+
}
205+
206+
/// Get the underlying `SqliteDatabase`.
207+
pub fn inner(&self) -> &Arc<SqliteDatabase> {
208+
&self.db
209+
}
210+
211+
/// Get the list of currently observed tables.
212+
pub fn observed_tables(&self) -> Vec<String> {
213+
self.broker.get_observed_tables()
214+
}
215+
216+
/// Returns a reference to the underlying observation broker.
217+
pub fn broker(&self) -> &Arc<ObservationBroker> {
218+
&self.broker
219+
}
220+
}
221+
222+
impl Clone for ObservableSqliteDatabase {
223+
fn clone(&self) -> Self {
224+
Self {
225+
db: Arc::clone(&self.db),
226+
broker: Arc::clone(&self.broker),
227+
}
228+
}
229+
}
230+
231+
/// RAII guard for observable write access to the database.
232+
///
233+
/// This guard wraps a `WriteGuard` from `sqlx-sqlite-conn-mgr` and adds
234+
/// change tracking via SQLite hooks. Changes are published to subscribers
235+
/// when transactions commit.
236+
#[must_use = "if unused, the write lock is immediately released"]
237+
pub struct ObservableWriteGuard {
238+
writer: Option<WriteGuard>,
239+
hooks_registered: bool,
240+
/// Raw sqlite3 pointer, cached during register_hooks so we can
241+
/// call unregister_hooks synchronously in Drop without needing
242+
/// the async lock_handle.
243+
raw_db: Option<*mut sqlite3>,
244+
}
245+
246+
// SAFETY: The raw_db pointer is only used for hook registration/unregistration
247+
// and is always accessed from the same logical owner. The underlying sqlite3
248+
// connection is already Send via sqlx's PoolConnection.
249+
unsafe impl Send for ObservableWriteGuard {}
250+
251+
impl ObservableWriteGuard {
252+
fn writer_mut(&mut self) -> &mut WriteGuard {
253+
self.writer.as_mut().expect("writer already taken")
254+
}
255+
256+
/// Registers SQLite observation hooks on this writer.
257+
async fn register_hooks(&mut self, broker: Arc<ObservationBroker>) -> Result<()> {
258+
if self.hooks_registered {
259+
return Ok(());
260+
}
261+
262+
debug!("Registering SQLite observation hooks on WriteGuard");
263+
264+
let writer = self.writer.as_mut().expect("writer already taken");
265+
266+
// Get raw SQLite handle
267+
let mut handle = writer
268+
.lock_handle()
269+
.await
270+
.map_err(|e| crate::Error::Database(format!("Failed to lock connection handle: {}", e)))?;
271+
272+
let db: *mut sqlite3 = handle.as_raw_handle().as_ptr();
273+
274+
unsafe {
275+
hooks::register_hooks(db, broker)?;
276+
}
277+
278+
// Cache the raw pointer so Drop can call unregister_hooks synchronously.
279+
// SAFETY: The pointer remains valid for the lifetime of the WriteGuard,
280+
// which we own via self.writer.
281+
self.raw_db = Some(db);
282+
self.hooks_registered = true;
283+
Ok(())
284+
}
285+
286+
/// Consumes this wrapper and returns the underlying write guard.
287+
///
288+
/// Hooks are unregistered before returning the guard, so it can be
289+
/// safely used without observation.
290+
pub fn into_inner(mut self) -> WriteGuard {
291+
// Unregister hooks before returning the writer to prevent
292+
// use-after-free if the broker is dropped before the connection is reused.
293+
if self.hooks_registered
294+
&& let Some(db) = self.raw_db
295+
{
296+
unsafe {
297+
crate::hooks::unregister_hooks(db);
298+
}
299+
trace!("Hooks unregistered before returning inner WriteGuard");
300+
}
301+
self.hooks_registered = false;
302+
self.raw_db = None;
303+
self.writer.take().expect("writer already taken")
304+
}
305+
}
306+
307+
impl Drop for ObservableWriteGuard {
308+
fn drop(&mut self) {
309+
if self.hooks_registered
310+
&& let Some(db) = self.raw_db
311+
{
312+
// SAFETY: db was obtained from lock_handle during register_hooks and
313+
// remains valid because we still own the WriteGuard (self.writer).
314+
// The writer has not been taken (into_inner clears hooks_registered).
315+
unsafe {
316+
hooks::unregister_hooks(db);
317+
}
318+
trace!("ObservableWriteGuard dropped, hooks unregistered");
319+
}
320+
}
321+
}
322+
323+
impl Deref for ObservableWriteGuard {
324+
type Target = SqliteConnection;
325+
326+
fn deref(&self) -> &Self::Target {
327+
self.writer.as_ref().expect("writer already taken")
328+
}
329+
}
330+
331+
impl DerefMut for ObservableWriteGuard {
332+
fn deref_mut(&mut self) -> &mut Self::Target {
333+
self.writer_mut()
334+
}
335+
}

0 commit comments

Comments
 (0)