Skip to content

Commit c56155e

Browse files
committed
fix: make binding limiters per-request for warm isolate safety
Move BindingLimiters into RequestState behind Arc so each request gets its own fresh counters. Prevents cross-request limiter interference during interleaved execution on the same thread. Pattern: self.limiters() clones the Arc (cheap), no MutexGuard across await.
1 parent 7da9a7e commit c56155e

1 file changed

Lines changed: 71 additions & 27 deletions

File tree

src/ops.rs

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ pub type DbPool = sqlx::Pool<sqlx::Postgres>;
119119
/// Per-request state that changes between requests for context reuse.
120120
///
121121
/// When warm isolates are enabled, the same `RunnerOperations` handle persists
122-
/// across requests. Only `log_tx` and `span` change per-request.
122+
/// across requests. Each request gets its own limiters (fresh counters) to
123+
/// avoid cross-request interference during interleaved execution.
123124
pub struct RequestState {
124125
pub log_tx: Option<LogTx>,
125126
pub span: tracing::Span,
127+
pub limiters: Arc<BindingLimiters>,
126128
}
127129

128130
/// Runner's implementation of OperationsHandler
@@ -144,8 +146,6 @@ pub struct RunnerOperations {
144146
pub bindings: BindingConfigs,
145147
/// Database pool for KV operations
146148
pub db_pool: Option<DbPool>,
147-
/// Binding limiters (rate limiting for fetch, KV, database, storage)
148-
pub limiters: BindingLimiters,
149149
}
150150

151151
impl RunnerOperations {
@@ -157,19 +157,13 @@ impl RunnerOperations {
157157
request_state: Mutex::new(RequestState {
158158
log_tx: None,
159159
span: tracing::Span::none(),
160+
limiters: Arc::new(BindingLimiters::default()),
160161
}),
161162
bindings: BindingConfigs::new(),
162163
db_pool: None,
163-
limiters: BindingLimiters::default(),
164164
}
165165
}
166166

167-
/// Create with custom limiters based on RuntimeLimits
168-
pub fn with_limiters(mut self, limiters: BindingLimiters) -> Self {
169-
self.limiters = limiters;
170-
self
171-
}
172-
173167
/// Attach tracing span for context propagation
174168
pub fn with_span(self, span: tracing::Span) -> Self {
175169
self.request_state.lock().unwrap().span = span;
@@ -201,20 +195,27 @@ impl RunnerOperations {
201195
self
202196
}
203197

204-
/// Swap per-request state (log handler + tracing span) for warm context reuse.
198+
/// Swap per-request state for warm context reuse.
205199
///
206200
/// Called when an existing ops handle is reused for a new request.
201+
/// Creates fresh limiters so each request has its own counters.
207202
pub fn update_request(&self, log_tx: LogTx, span: tracing::Span) {
208203
let mut state = self.request_state.lock().unwrap();
209204
state.log_tx = Some(log_tx);
210205
state.span = span;
206+
state.limiters = Arc::new(BindingLimiters::default());
211207
}
212208

213209
/// Get a clone of the current span
214210
fn span(&self) -> tracing::Span {
215211
self.request_state.lock().unwrap().span.clone()
216212
}
217213

214+
/// Get the current request's limiters (cheap Arc clone)
215+
fn limiters(&self) -> Arc<BindingLimiters> {
216+
self.request_state.lock().unwrap().limiters.clone()
217+
}
218+
218219
/// Execute a binding fetch with the given config (shared by assets and storage).
219220
fn do_binding_fetch(
220221
&self,
@@ -228,12 +229,9 @@ impl RunnerOperations {
228229
Box::pin(
229230
async move {
230231
// Acquire fetch limiter permit
231-
let _guard = self
232-
.limiters
233-
.fetch
234-
.acquire()
235-
.await
236-
.map_err(|e| e.to_string())?;
232+
let limiters = self.limiters();
233+
234+
let _guard = limiters.fetch.acquire().await.map_err(|e| e.to_string())?;
237235

238236
self.stats.fetch_count.fetch_add(1, Ordering::Relaxed);
239237

@@ -304,12 +302,9 @@ impl OperationsHandler for RunnerOperations {
304302
Box::pin(
305303
async move {
306304
// Acquire fetch limiter permit (blocks if at concurrent limit, errors if total exceeded)
307-
let _guard = self
308-
.limiters
309-
.fetch
310-
.acquire()
311-
.await
312-
.map_err(|e| e.to_string())?;
305+
let limiters = self.limiters();
306+
307+
let _guard = limiters.fetch.acquire().await.map_err(|e| e.to_string())?;
313308

314309
self.stats.fetch_count.fetch_add(1, Ordering::Relaxed);
315310

@@ -401,7 +396,9 @@ impl OperationsHandler for RunnerOperations {
401396
Box::pin(
402397
async move {
403398
// Acquire storage limiter permit
404-
if let Err(e) = self.limiters.storage.acquire().await {
399+
let limiters = self.limiters();
400+
401+
if let Err(e) = limiters.storage.acquire().await {
405402
return StorageResult::Error(e.to_string());
406403
}
407404

@@ -472,7 +469,9 @@ impl OperationsHandler for RunnerOperations {
472469
Box::pin(
473470
async move {
474471
// Acquire KV limiter permit
475-
if let Err(e) = self.limiters.kv.acquire().await {
472+
let limiters = self.limiters();
473+
474+
if let Err(e) = limiters.kv.acquire().await {
476475
return KvResult::Error(e.to_string());
477476
}
478477

@@ -525,14 +524,16 @@ impl OperationsHandler for RunnerOperations {
525524
Box::pin(
526525
async move {
527526
// Acquire database limiter permit - hold guard until query completes
528-
let _guard = match self.limiters.database.acquire().await {
527+
let limiters = self.limiters();
528+
529+
let _guard = match limiters.database.acquire().await {
529530
Ok(guard) => guard,
530531
Err(e) => return DatabaseResult::Error(e.to_string()),
531532
};
532533

533534
tracing::debug!(
534535
"[ops] database limiter acquired, available permits: {}",
535-
self.limiters.database.available_permits()
536+
limiters.database.available_permits()
536537
);
537538

538539
match op {
@@ -696,6 +697,7 @@ impl OperationsHandler for RunnerOperations {
696697
#[cfg(test)]
697698
mod tests {
698699
use super::*;
700+
use futures::FutureExt;
699701

700702
#[test]
701703
fn test_runner_operations_builder() {
@@ -732,4 +734,46 @@ mod tests {
732734
assert_eq!(ops.stats.fetch_bytes_in.load(Ordering::Relaxed), 0);
733735
assert_eq!(ops.stats.fetch_bytes_out.load(Ordering::Relaxed), 0);
734736
}
737+
738+
/// Verify that update_request gives each request fresh limiters.
739+
///
740+
/// Simulates warm reuse: request A increments a limiter, then
741+
/// update_request is called for request B. B must have its own
742+
/// fresh counters, and A's counters must be unaffected.
743+
#[test]
744+
fn test_update_request_creates_fresh_limiters() {
745+
let ops = RunnerOperations::new();
746+
let (tx_a, _rx_a) = std::sync::mpsc::channel::<LogEvent>();
747+
let (tx_b, _rx_b) = std::sync::mpsc::channel::<LogEvent>();
748+
749+
// Request A grabs its limiters and simulates some usage
750+
let limiters_a = ops.limiters();
751+
limiters_a.fetch.reset(); // ensure clean
752+
assert_eq!(limiters_a.fetch.count(), 0);
753+
754+
// Simulate 5 fetch calls by request A
755+
for _ in 0..5 {
756+
limiters_a.fetch.acquire().now_or_never().unwrap().unwrap();
757+
}
758+
759+
assert_eq!(limiters_a.fetch.count(), 5);
760+
761+
// Warm reuse: new request B arrives
762+
ops.update_request(tx_b, tracing::Span::none());
763+
let limiters_b = ops.limiters();
764+
765+
// B must have fresh counters
766+
assert_eq!(
767+
limiters_b.fetch.count(),
768+
0,
769+
"request B should have fresh limiters"
770+
);
771+
772+
// A's counters must be untouched (different Arc)
773+
assert_eq!(
774+
limiters_a.fetch.count(),
775+
5,
776+
"request A's limiters should be unaffected"
777+
);
778+
}
735779
}

0 commit comments

Comments
 (0)