Skip to content

Commit 9a69a4c

Browse files
committed
feat: stream request body for chunked transfers
Only stream when Transfer-Encoding: chunked is present, otherwise collect the full body as before (no behavior change for normal requests). Guards against abuse: - 5s idle timeout between chunks - 1MB max body size - Pump task aborted when response is sent
1 parent 8ef0e7a commit 9a69a4c

3 files changed

Lines changed: 69 additions & 23 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-runner"
3-
version = "0.14.1"
3+
version = "0.14.2"
44
edition = "2024"
55
license = "MIT"
66
default-run = "openworkers-runner"

bin/main.rs

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -462,34 +462,75 @@ async fn handle_worker_request(
462462

463463
let start = tokio::time::Instant::now();
464464

465-
// Convert to our HttpRequest with streaming body (zero-copy)
466-
let (mut request, body_tx) =
467-
HttpRequest::from_hyper_parts_streaming(&method, &uri, &headers, "http", 16);
465+
let is_chunked = headers
466+
.get("transfer-encoding")
467+
.and_then(|v| v.to_str().ok())
468+
.is_some_and(|v| v.eq_ignore_ascii_case("chunked"));
468469

469-
// Spawn task to pump body chunks from hyper into the request stream
470-
tokio::spawn(async move {
471-
use http_body_util::BodyExt;
470+
let mut pump_handle: Option<tokio::task::JoinHandle<()>> = None;
471+
472+
let mut request = if is_chunked {
473+
// Streaming path: pipe body chunks to the worker via mpsc channel.
474+
// Guards: 30s idle timeout per chunk, 10MB max total body size.
475+
const CHUNK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
476+
const MAX_BODY_SIZE: usize = 1024 * 1024;
477+
478+
let (req_obj, body_tx) =
479+
HttpRequest::from_hyper_parts_streaming(&method, &uri, &headers, "http", 16);
472480

473-
let mut body = req.into_body();
481+
pump_handle = Some(tokio::spawn(async move {
482+
use http_body_util::BodyExt;
474483

475-
while let Some(result) = body.frame().await {
476-
match result {
477-
Ok(frame) => {
478-
if let Some(data) = frame.data_ref()
479-
&& body_tx.send(Ok(data.clone())).await.is_err()
480-
{
484+
let mut body = req.into_body();
485+
let mut total = 0usize;
486+
487+
loop {
488+
let frame = match tokio::time::timeout(CHUNK_TIMEOUT, body.frame()).await {
489+
Ok(Some(Ok(frame))) => frame,
490+
Ok(Some(Err(e))) => {
491+
let _ = body_tx
492+
.send(Err::<bytes::Bytes, String>(e.to_string()))
493+
.await;
481494
break;
482495
}
483-
}
484-
Err(e) => {
485-
let _ = body_tx
486-
.send(Err::<bytes::Bytes, String>(e.to_string()))
487-
.await;
488-
break;
496+
Ok(None) => break, // body complete
497+
Err(_) => break, // timeout — stop pumping
498+
};
499+
500+
if let Some(data) = frame.data_ref() {
501+
total += data.len();
502+
503+
if total > MAX_BODY_SIZE {
504+
let _ = body_tx
505+
.send(Err::<bytes::Bytes, String>(
506+
"Request body too large".to_string(),
507+
))
508+
.await;
509+
break;
510+
}
511+
512+
if body_tx.send(Ok(data.clone())).await.is_err() {
513+
break; // receiver dropped
514+
}
489515
}
490516
}
491-
}
492-
});
517+
}));
518+
519+
req_obj
520+
} else {
521+
// Buffered path: collect entire body before processing.
522+
use http_body_util::BodyExt;
523+
524+
let body_bytes = match req.into_body().collect().await {
525+
Ok(collected) => collected.to_bytes(),
526+
Err(e) => {
527+
error!("Failed to read request body: {}", e);
528+
return Ok(error_response(400, "Failed to read request body"));
529+
}
530+
};
531+
532+
HttpRequest::from_hyper_parts(&method, &uri, &headers, body_bytes, "http")
533+
};
493534

494535
// Add worker headers if not present
495536
if !request.headers.contains_key("x-worker-id") {
@@ -597,6 +638,11 @@ async fn handle_worker_request(
597638
}
598639
};
599640

641+
// Abort body pump if still running (free the connection)
642+
if let Some(handle) = pump_handle {
643+
handle.abort();
644+
}
645+
600646
debug!("handle_request done in {}ms", start.elapsed().as_millis());
601647

602648
// Record response status code and metrics

0 commit comments

Comments
 (0)