Skip to content

Commit dd891a3

Browse files
pmorris-devclaude
andcommitted
fix: cap observer channel_capacity to prevent memory exhaustion
observe() passed user-supplied channel_capacity directly to tokio::sync::broadcast::channel() without bounds. A capacity of 0 panics, and a very large value pre-allocates an enormous ring buffer. Add an assertion in ObservationBroker::new() to reject zero capacity at the crate level, preventing a panic regardless of how the crate is used. In the plugin layer, validate that channel_capacity is between 1 and 10,000 before applying it, returning an INVALID_CONFIG error otherwise. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 272bdf6 commit dd891a3

4 files changed

Lines changed: 20 additions & 0 deletions

File tree

crates/sqlx-sqlite-observer/src/broker.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ pub struct ObservationBroker {
5959

6060
impl ObservationBroker {
6161
/// Creates a new broker with the specified broadcast channel capacity.
62+
///
63+
/// # Panics
64+
///
65+
/// Panics if `channel_capacity` is 0.
6266
pub fn new(channel_capacity: usize, capture_values: bool) -> Arc<Self> {
67+
assert!(channel_capacity > 0, "channel_capacity must be at least 1");
6368
let (change_tx, _) = broadcast::channel(channel_capacity);
6469
Arc::new(Self {
6570
buffer: Mutex::new(Vec::new()),

crates/sqlx-sqlite-observer/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ impl ObserverConfig {
9090

9191
/// Sets the broadcast channel capacity for change notifications.
9292
///
93+
/// Capacity must be at least 1. A capacity of 0 will cause a panic when the
94+
/// observer is initialized.
95+
///
9396
/// See [`channel_capacity`](Self::channel_capacity) for details on sizing.
9497
pub fn with_channel_capacity(mut self, capacity: usize) -> Self {
9598
self.channel_capacity = capacity;

src/commands.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,10 +647,17 @@ pub async fn observe(
647647
.get_mut(&db)
648648
.ok_or_else(|| Error::DatabaseNotLoaded(db.clone()))?;
649649

650+
const MAX_CHANNEL_CAPACITY: usize = 10_000;
651+
650652
let mut observer_config = sqlx_sqlite_observer::ObserverConfig::new().with_tables(tables);
651653

652654
if let Some(params) = config {
653655
if let Some(capacity) = params.channel_capacity {
656+
if capacity == 0 || capacity > MAX_CHANNEL_CAPACITY {
657+
return Err(Error::InvalidConfig(format!(
658+
"channel_capacity must be between 1 and {MAX_CHANNEL_CAPACITY}, got {capacity}"
659+
)));
660+
}
654661
observer_config = observer_config.with_channel_capacity(capacity);
655662
}
656663
if let Some(capture) = params.capture_values {

src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ pub enum Error {
3939
#[error("observation not enabled for database: {0}")]
4040
ObservationNotEnabled(String),
4141

42+
/// Invalid configuration parameter.
43+
#[error("invalid configuration: {0}")]
44+
InvalidConfig(String),
45+
4246
/// Generic error for operations that don't fit other categories.
4347
#[error("{0}")]
4448
Other(String),
@@ -74,6 +78,7 @@ impl Error {
7478
Error::PathTraversal(_) => "PATH_TRAVERSAL".to_string(),
7579
Error::DatabaseNotLoaded(_) => "DATABASE_NOT_LOADED".to_string(),
7680
Error::ObservationNotEnabled(_) => "OBSERVATION_NOT_ENABLED".to_string(),
81+
Error::InvalidConfig(_) => "INVALID_CONFIG".to_string(),
7782
Error::Other(_) => "ERROR".to_string(),
7883
}
7984
}

0 commit comments

Comments
 (0)