Skip to content

Commit 8ef0e7a

Browse files
committed
feat: stream request body to worker instead of buffering
Use from_hyper_parts_streaming() to pipe incoming HTTP body chunks directly to the JavaScript ReadableStream via an mpsc channel, instead of collecting the entire body in memory with req.collect().await. This enables true streaming uploads where the worker can process chunks as they arrive.
1 parent dce64fd commit 8ef0e7a

3 files changed

Lines changed: 29 additions & 13 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-runner"
3-
version = "0.14.0"
3+
version = "0.14.1"
44
edition = "2024"
55
license = "MIT"
66
default-run = "openworkers-runner"

bin/main.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use bytes::Bytes;
2-
use http_body_util::BodyExt;
32
use hyper::server::conn::http1;
43
use hyper::service::service_fn;
54
use hyper::{Request, Response};
@@ -463,17 +462,34 @@ async fn handle_worker_request(
463462

464463
let start = tokio::time::Instant::now();
465464

466-
// Collect request body (consumes req)
467-
let body_bytes = match req.collect().await {
468-
Ok(collected) => collected.to_bytes(),
469-
Err(e) => {
470-
error!("Failed to read request body: {}", e);
471-
return Ok(error_response(400, "Failed to read request body"));
472-
}
473-
};
465+
// Convert to our HttpRequest with streaming body (zero-copy)
466+
let (mut request, body_tx) =
467+
HttpRequest::from_hyper_parts_streaming(&method, &uri, &headers, "http", 16);
468+
469+
// Spawn task to pump body chunks from hyper into the request stream
470+
tokio::spawn(async move {
471+
use http_body_util::BodyExt;
474472

475-
// Convert to our HttpRequest using the extracted parts
476-
let mut request = HttpRequest::from_hyper_parts(&method, &uri, &headers, body_bytes, "http");
473+
let mut body = req.into_body();
474+
475+
while let Some(result) = body.frame().await {
476+
match result {
477+
Ok(frame) => {
478+
if let Some(data) = frame.data_ref()
479+
&& body_tx.send(Ok(data.clone())).await.is_err()
480+
{
481+
break;
482+
}
483+
}
484+
Err(e) => {
485+
let _ = body_tx
486+
.send(Err::<bytes::Bytes, String>(e.to_string()))
487+
.await;
488+
break;
489+
}
490+
}
491+
}
492+
});
477493

478494
// Add worker headers if not present
479495
if !request.headers.contains_key("x-worker-id") {

0 commit comments

Comments
 (0)