Skip to content

Commit 48767ec

Browse files
pmorris-devclaude
andcommitted
feat: add limits on observed table count and subscriptions
observe() and subscribe() accepted unbounded input that could exhaust memory and spawn unlimited tasks. Add validation: observe() rejects empty or >100 table lists, subscribe() rejects >100 subscriptions per database. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 155f1d9 commit 48767ec

3 files changed

Lines changed: 28 additions & 2 deletions

File tree

src/commands.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,16 @@ pub async fn observe(
644644
tables: Vec<String>,
645645
config: Option<ObserverConfigParams>,
646646
) -> Result<()> {
647+
const MAX_OBSERVED_TABLES: usize = 100;
648+
const MAX_CHANNEL_CAPACITY: usize = 10_000;
649+
650+
if tables.is_empty() || tables.len() > MAX_OBSERVED_TABLES {
651+
return Err(Error::InvalidConfig(format!(
652+
"tables count must be between 1 and {MAX_OBSERVED_TABLES}, got {}",
653+
tables.len()
654+
)));
655+
}
656+
647657
// Abort plugin-level subscription tasks before the crate-level
648658
// enable_observation() drops the old broker
649659
active_subs.remove_for_db(&db).await;
@@ -654,8 +664,6 @@ pub async fn observe(
654664
.get_mut(&db)
655665
.ok_or_else(|| Error::DatabaseNotLoaded(db.clone()))?;
656666

657-
const MAX_CHANNEL_CAPACITY: usize = 10_000;
658-
659667
let mut observer_config = sqlx_sqlite_observer::ObserverConfig::new().with_tables(tables);
660668

661669
if let Some(params) = config {
@@ -690,6 +698,13 @@ pub async fn subscribe(
690698
tables: Vec<String>,
691699
on_event: Channel<TableChangePayload>,
692700
) -> Result<String> {
701+
const MAX_SUBSCRIPTIONS_PER_DATABASE: usize = 100;
702+
703+
let sub_count = active_subs.count_for_db(&db).await;
704+
if sub_count >= MAX_SUBSCRIPTIONS_PER_DATABASE {
705+
return Err(Error::TooManySubscriptions(MAX_SUBSCRIPTIONS_PER_DATABASE));
706+
}
707+
693708
let instances = db_instances.inner.read().await;
694709

695710
let wrapper = instances

src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ pub enum Error {
4343
#[error("cannot load more than {0} databases")]
4444
TooManyDatabases(usize),
4545

46+
/// Too many subscriptions for a single database.
47+
#[error("cannot create more than {0} subscriptions per database")]
48+
TooManySubscriptions(usize),
49+
4650
/// Invalid configuration parameter.
4751
#[error("invalid configuration: {0}")]
4852
InvalidConfig(String),
@@ -83,6 +87,7 @@ impl Error {
8387
Error::DatabaseNotLoaded(_) => "DATABASE_NOT_LOADED".to_string(),
8488
Error::ObservationNotEnabled(_) => "OBSERVATION_NOT_ENABLED".to_string(),
8589
Error::TooManyDatabases(_) => "TOO_MANY_DATABASES".to_string(),
90+
Error::TooManySubscriptions(_) => "TOO_MANY_SUBSCRIPTIONS".to_string(),
8691
Error::InvalidConfig(_) => "INVALID_CONFIG".to_string(),
8792
Error::Other(_) => "ERROR".to_string(),
8893
}

src/subscriptions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@ impl ActiveSubscriptions {
161161
}
162162
}
163163

164+
/// Count active subscriptions for a specific database.
165+
pub async fn count_for_db(&self, db_path: &str) -> usize {
166+
let subs = self.0.read().await;
167+
subs.values().filter(|sub| sub.db_path == db_path).count()
168+
}
169+
164170
/// Abort all subscriptions (for cleanup on app exit).
165171
pub async fn abort_all(&self) {
166172
let mut subs = self.0.write().await;

0 commit comments

Comments
 (0)