From aac0228162869deff06dc1bfeaae1cf6498db718 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Fri, 10 Apr 2026 14:07:21 -0700 Subject: [PATCH 1/6] Add R2 parallel get benchmark with timing logs to verify concurrent execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds sequential and parallel R2 get-many endpoints and integration tests to investigate a suspected parallelism bug. Instrumented with console_log timestamps to trace individual get start/completion times. Log output proves parallelism is working correctly: Sequential (512 objects, ~425ms total): key 0 started at 0ms, took 1ms key 100 started at 23ms, took 0ms key 500 started at 114ms, took 1ms (each get waits for the previous to complete) Parallel (512 objects, ~496ms total): all futures spawned at +1ms key 0 polled at +1ms, completed at +169ms (took 168ms) key 500 polled at +12ms, completed at +174ms (took 162ms) (all gets start concurrently and complete at the same wall time) The parallel approach is marginally slower overall due to future_to_promise/Promise.all overhead, but the individual operations are clearly running concurrently — all 512 complete within a 5ms window rather than being spread across 114ms of sequential start times. --- test/src/r2.rs | 116 ++++++++++++++++++++++++++++++++++++++++++ test/src/router.rs | 2 + test/tests/r2.spec.ts | 30 +++++++++++ 3 files changed, 148 insertions(+) diff --git a/test/src/r2.rs b/test/src/r2.rs index bbf986856..687624452 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -1,10 +1,15 @@ use futures_util::StreamExt; +use serde::Serialize; use std::{ collections::HashMap, convert::TryFrom, sync::atomic::{AtomicBool, Ordering}, }; use worker::{ + console_log, + js_sys::{Array, Promise}, + wasm_bindgen::JsValue, + wasm_bindgen_futures::{future_to_promise, JsFuture}, Bucket, Conditional, Data, Date, Env, FixedLengthStream, HttpMetadata, Include, Request, Response, Result, }; @@ -13,6 +18,31 @@ use crate::SomeSharedData; static SEEDED: AtomicBool = AtomicBool::new(false); +const REPRO_OBJECT_COUNT: usize = 512; + +#[derive(Serialize)] +struct MultiGetTiming { + mode: &'static str, + count: usize, + elapsed_ms: u64, +} + +fn repro_keys() -> Vec { + (0..REPRO_OBJECT_COUNT) + .map(|i| format!("repro/parallel-get-{i}")) + .collect() +} + +async fn seed_repro_keys(bucket: &Bucket, keys: &[String]) -> Result<()> { + for key in keys { + bucket + .put(key, format!("value-for-{key}")) + .execute() + .await?; + } + Ok(()) +} + pub async fn seed_bucket(bucket: &Bucket) -> Result<()> { if SEEDED.load(Ordering::Acquire) { return Ok(()); @@ -141,6 +171,92 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { + let bucket = env.bucket("PUT_BUCKET")?; + let keys = repro_keys(); + seed_repro_keys(&bucket, &keys).await?; + + let start = Date::now().as_millis(); + let mut values = Vec::with_capacity(keys.len()); + for (i, key) in keys.iter().enumerate() { + let get_start = Date::now().as_millis(); + let object = bucket.get(key).execute().await?.expect("seeded object missing"); + let body = object.body().expect("seeded object body missing"); + values.push(body.text().await?); + let get_elapsed = Date::now().as_millis() - get_start; + if i < 10 || i % 100 == 0 { + console_log!("[seq] key {i} started at {}ms, took {get_elapsed}ms", get_start - start); + } + } + let elapsed_ms = (Date::now().as_millis() - start) as u64; + + assert_eq!(values.len(), keys.len()); + + Response::from_json(&MultiGetTiming { + mode: "sequential", + count: values.len(), + elapsed_ms, + }) +} + +#[worker::send] +pub async fn get_many_parallel(_req: Request, env: Env, _data: SomeSharedData) -> Result { + let bucket = env.bucket("PUT_BUCKET")?; + let keys = repro_keys(); + seed_repro_keys(&bucket, &keys).await?; + + let start = Date::now().as_millis(); + let keys_len_u32 = u32::try_from(keys.len()).expect("too many keys"); + let promises = Array::new_with_length(keys_len_u32); + for (i, key) in keys.iter().enumerate() { + let bucket = bucket.clone(); + let key = key.clone(); + let outer_start = start; + let promise = future_to_promise(async move { + let get_start = Date::now().as_millis(); + let spawn_offset = get_start - outer_start; + if i < 10 || i % 100 == 0 { + console_log!("[par] key {i} future polled first at +{spawn_offset}ms"); + } + let object = bucket + .get(&key) + .execute() + .await + .map_err(|e| JsValue::from_str(&e.to_string()))? + .ok_or_else(|| JsValue::from_str("seeded object missing"))?; + let body = object + .body() + .ok_or_else(|| JsValue::from_str("seeded object body missing"))?; + let text = body + .text() + .await + .map_err(|e| JsValue::from_str(&e.to_string()))?; + let get_elapsed = Date::now().as_millis() - get_start; + if i < 10 || i % 100 == 0 { + console_log!("[par] key {i} completed at +{}ms, took {get_elapsed}ms", Date::now().as_millis() - outer_start); + } + Ok(JsValue::from_str(&text)) + }); + + promises.set(i as u32, promise.into()); + } + let spawn_done = Date::now().as_millis(); + console_log!("[par] all futures spawned at +{}ms", spawn_done - start); + + let results = JsFuture::from(Promise::all(&promises)) + .await + .map_err(|e| worker::Error::RustError(format!("Promise.all failed: {e:?}")))?; + let values = Array::from(&results); + let elapsed_ms = (Date::now().as_millis() - start) as u64; + + Response::from_json(&MultiGetTiming { + mode: "parallel", + count: values.length() as usize, + elapsed_ms, + }) +} + #[worker::send] pub async fn put(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("PUT_BUCKET")?; diff --git a/test/src/router.rs b/test/src/router.rs index b6f348717..75334bf67 100644 --- a/test/src/router.rs +++ b/test/src/router.rs @@ -210,6 +210,8 @@ macro_rules! add_routes ( add_route!($obj, get, "/r2/list", r2::list); add_route!($obj, get,"/r2/get-empty", r2::get_empty); add_route!($obj, get, "/r2/get", r2::get); + add_route!($obj, get, "/r2/get-many-sequential", r2::get_many_sequential); + add_route!($obj, get, "/r2/get-many-parallel", r2::get_many_parallel); add_route!($obj, put, "/r2/put", r2::put); add_route!($obj, put, "/r2/put-properties", r2::put_properties); add_route!($obj, put, "/r2/put-multipart", r2::put_multipart); diff --git a/test/tests/r2.spec.ts b/test/tests/r2.spec.ts index 0b2a3a8b5..77b712e26 100644 --- a/test/tests/r2.spec.ts +++ b/test/tests/r2.spec.ts @@ -22,6 +22,36 @@ describe("r2", () => { expect(await resp.text()).toBe("ok"); }); + test("get many sequential", async () => { + const resp = await mf.dispatchFetch(`${mfUrl}r2/get-many-sequential`); + expect(resp.status).toBe(200); + + const body = (await resp.json()) as { + mode: string; + count: number; + elapsed_ms: number; + }; + + expect(body.mode).toBe("sequential"); + expect(body.count).toBe(512); + expect(body.elapsed_ms).toBeGreaterThanOrEqual(0); + }); + + test("get many parallel", async () => { + const resp = await mf.dispatchFetch(`${mfUrl}r2/get-many-parallel`); + expect(resp.status).toBe(200); + + const body = (await resp.json()) as { + mode: string; + count: number; + elapsed_ms: number; + }; + + expect(body.mode).toBe("parallel"); + expect(body.count).toBe(512); + expect(body.elapsed_ms).toBeGreaterThanOrEqual(0); + }); + test("put", async () => { const resp = await mf.dispatchFetch(`${mfUrl}r2/put`, { method: "put", From c5218e675492076501dafc821b0ad0c79e614e73 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Fri, 10 Apr 2026 14:15:39 -0700 Subject: [PATCH 2/6] Add chunked parallel R2 get benchmark (buffer_unordered) Demonstrates the efficient middle ground: bounded concurrency via futures_util::stream::buffer_unordered(32) avoids the overhead of spawning all 512 futures at once while still getting parallelism within each chunk. --- test/src/r2.rs | 47 +++++++++++++++++++++++++++++++++++++++++++ test/src/router.rs | 1 + test/tests/r2.spec.ts | 15 ++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/test/src/r2.rs b/test/src/r2.rs index 687624452..f44d94c91 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -257,6 +257,53 @@ pub async fn get_many_parallel(_req: Request, env: Env, _data: SomeSharedData) - }) } +const CHUNK_SIZE: usize = 32; + +#[worker::send] +pub async fn get_many_chunked(_req: Request, env: Env, _data: SomeSharedData) -> Result { + let bucket = env.bucket("PUT_BUCKET")?; + let keys = repro_keys(); + seed_repro_keys(&bucket, &keys).await?; + + let start = Date::now().as_millis(); + let values: Vec = futures_util::stream::iter(keys.iter().enumerate()) + .map(|(i, key)| { + let bucket = bucket.clone(); + let key = key.clone(); + async move { + let get_start = Date::now().as_millis(); + if i < 10 || i % 100 == 0 { + console_log!("[chunk] key {i} started at +{}ms", get_start - start); + } + let object = bucket + .get(&key) + .execute() + .await + .expect("seeded object missing") + .expect("seeded object missing"); + let body = object.body().expect("seeded object body missing"); + let text = body.text().await.expect("body text"); + let get_elapsed = Date::now().as_millis() - get_start; + if i < 10 || i % 100 == 0 { + console_log!("[chunk] key {i} completed at +{}ms, took {get_elapsed}ms", Date::now().as_millis() - start); + } + text + } + }) + .buffer_unordered(CHUNK_SIZE) + .collect() + .await; + let elapsed_ms = (Date::now().as_millis() - start) as u64; + + assert_eq!(values.len(), REPRO_OBJECT_COUNT); + + Response::from_json(&MultiGetTiming { + mode: "chunked", + count: values.len(), + elapsed_ms, + }) +} + #[worker::send] pub async fn put(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("PUT_BUCKET")?; diff --git a/test/src/router.rs b/test/src/router.rs index 75334bf67..d9601ff2f 100644 --- a/test/src/router.rs +++ b/test/src/router.rs @@ -212,6 +212,7 @@ macro_rules! add_routes ( add_route!($obj, get, "/r2/get", r2::get); add_route!($obj, get, "/r2/get-many-sequential", r2::get_many_sequential); add_route!($obj, get, "/r2/get-many-parallel", r2::get_many_parallel); + add_route!($obj, get, "/r2/get-many-chunked", r2::get_many_chunked); add_route!($obj, put, "/r2/put", r2::put); add_route!($obj, put, "/r2/put-properties", r2::put_properties); add_route!($obj, put, "/r2/put-multipart", r2::put_multipart); diff --git a/test/tests/r2.spec.ts b/test/tests/r2.spec.ts index 77b712e26..409588123 100644 --- a/test/tests/r2.spec.ts +++ b/test/tests/r2.spec.ts @@ -52,6 +52,21 @@ describe("r2", () => { expect(body.elapsed_ms).toBeGreaterThanOrEqual(0); }); + test("get many chunked", async () => { + const resp = await mf.dispatchFetch(`${mfUrl}r2/get-many-chunked`); + expect(resp.status).toBe(200); + + const body = (await resp.json()) as { + mode: string; + count: number; + elapsed_ms: number; + }; + + expect(body.mode).toBe("chunked"); + expect(body.count).toBe(512); + expect(body.elapsed_ms).toBeGreaterThanOrEqual(0); + }); + test("put", async () => { const resp = await mf.dispatchFetch(`${mfUrl}r2/put`, { method: "put", From 7a07d5aee56468e82e0faa99f85c2eb38c3d9629 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Fri, 10 Apr 2026 15:27:40 -0700 Subject: [PATCH 3/6] Add join_all variant and pure JS R2 bench proving overhead is host-side Adds a join_all test (Rust-side FuturesUnordered, no extra JS promises) which performs identically to future_to_promise+Promise.all, disproving the theory that extra promise allocations cause the parallel overhead. Adds a pure JS R2 benchmark (test/r2-bench/) that reproduces the same performance shape with zero wasm-bindgen involvement: JS sequential (1024 objects): ~163-233ms JS parallel (1024 objects): ~432-447ms The parallel slowdown is in miniflare/workerd local R2, not wasm-bindgen. --- test/r2-bench/worker.js | 67 +++++++++++++++++++++++++++++++++++++ test/r2-bench/wrangler.toml | 8 +++++ test/src/r2.rs | 43 ++++++++++++++++++++++++ test/src/router.rs | 1 + test/tests/r2.spec.ts | 15 +++++++++ 5 files changed, 134 insertions(+) create mode 100644 test/r2-bench/worker.js create mode 100644 test/r2-bench/wrangler.toml diff --git a/test/r2-bench/worker.js b/test/r2-bench/worker.js new file mode 100644 index 000000000..7ee5d5111 --- /dev/null +++ b/test/r2-bench/worker.js @@ -0,0 +1,67 @@ +// Pure JS worker to test R2 parallel vs sequential get performance. +// Run with: cd test/r2-bench && npx wrangler dev + +const COUNT = 1024; + +async function seed(bucket) { + const existing = await bucket.head(`bench/key-0`); + if (existing) return; + for (let i = 0; i < COUNT; i++) { + await bucket.put(`bench/key-${i}`, `value-${i}`); + } +} + +async function sequential(bucket) { + const start = Date.now(); + const values = []; + for (let i = 0; i < COUNT; i++) { + const obj = await bucket.get(`bench/key-${i}`); + values.push(await obj.text()); + } + return { mode: "sequential", count: values.length, elapsed_ms: Date.now() - start }; +} + +async function parallel(bucket) { + const start = Date.now(); + const promises = []; + for (let i = 0; i < COUNT; i++) { + promises.push( + bucket.get(`bench/key-${i}`).then((obj) => obj.text()) + ); + } + const values = await Promise.all(promises); + return { mode: "parallel", count: values.length, elapsed_ms: Date.now() - start }; +} + +export default { + async fetch(request, env) { + const url = new URL(request.url); + const bucket = env.BUCKET; + + if (url.pathname === "/seed") { + await seed(bucket); + return Response.json({ seeded: COUNT }); + } + + if (url.pathname === "/sequential") { + await seed(bucket); + const result = await sequential(bucket); + return Response.json(result); + } + + if (url.pathname === "/parallel") { + await seed(bucket); + const result = await parallel(bucket); + return Response.json(result); + } + + if (url.pathname === "/both") { + await seed(bucket); + const seq = await sequential(bucket); + const par = await parallel(bucket); + return Response.json({ sequential: seq, parallel: par }); + } + + return new Response("GET /sequential, /parallel, or /both", { status: 404 }); + }, +}; diff --git a/test/r2-bench/wrangler.toml b/test/r2-bench/wrangler.toml new file mode 100644 index 000000000..5ef5c991b --- /dev/null +++ b/test/r2-bench/wrangler.toml @@ -0,0 +1,8 @@ +name = "r2-bench" +main = "worker.js" +compatibility_date = "2025-09-23" + +[[r2_buckets]] +binding = "BUCKET" +bucket_name = "bench-bucket" +preview_bucket_name = "bench-bucket" diff --git a/test/src/r2.rs b/test/src/r2.rs index f44d94c91..432dc1436 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -304,6 +304,49 @@ pub async fn get_many_chunked(_req: Request, env: Env, _data: SomeSharedData) -> }) } +#[worker::send] +pub async fn get_many_join(_req: Request, env: Env, _data: SomeSharedData) -> Result { + let bucket = env.bucket("PUT_BUCKET")?; + let keys = repro_keys(); + seed_repro_keys(&bucket, &keys).await?; + + let start = Date::now().as_millis(); + let futs: Vec<_> = keys.iter().enumerate().map(|(i, key)| { + let bucket = bucket.clone(); + let key = key.clone(); + async move { + let get_start = Date::now().as_millis(); + if i < 10 || i % 100 == 0 { + console_log!("[join] key {i} started at +{}ms", get_start - start); + } + let object = bucket + .get(&key) + .execute() + .await + .expect("seeded object missing") + .expect("seeded object missing"); + let body = object.body().expect("seeded object body missing"); + let text = body.text().await.expect("body text"); + let get_elapsed = Date::now().as_millis() - get_start; + if i < 10 || i % 100 == 0 { + console_log!("[join] key {i} completed at +{}ms, took {get_elapsed}ms", Date::now().as_millis() - start); + } + text + } + }).collect(); + + let values = futures_util::future::join_all(futs).await; + let elapsed_ms = (Date::now().as_millis() - start) as u64; + + assert_eq!(values.len(), REPRO_OBJECT_COUNT); + + Response::from_json(&MultiGetTiming { + mode: "join", + count: values.len(), + elapsed_ms, + }) +} + #[worker::send] pub async fn put(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("PUT_BUCKET")?; diff --git a/test/src/router.rs b/test/src/router.rs index d9601ff2f..e23176d99 100644 --- a/test/src/router.rs +++ b/test/src/router.rs @@ -213,6 +213,7 @@ macro_rules! add_routes ( add_route!($obj, get, "/r2/get-many-sequential", r2::get_many_sequential); add_route!($obj, get, "/r2/get-many-parallel", r2::get_many_parallel); add_route!($obj, get, "/r2/get-many-chunked", r2::get_many_chunked); + add_route!($obj, get, "/r2/get-many-join", r2::get_many_join); add_route!($obj, put, "/r2/put", r2::put); add_route!($obj, put, "/r2/put-properties", r2::put_properties); add_route!($obj, put, "/r2/put-multipart", r2::put_multipart); diff --git a/test/tests/r2.spec.ts b/test/tests/r2.spec.ts index 409588123..d62fef7f2 100644 --- a/test/tests/r2.spec.ts +++ b/test/tests/r2.spec.ts @@ -67,6 +67,21 @@ describe("r2", () => { expect(body.elapsed_ms).toBeGreaterThanOrEqual(0); }); + test("get many join", async () => { + const resp = await mf.dispatchFetch(`${mfUrl}r2/get-many-join`); + expect(resp.status).toBe(200); + + const body = (await resp.json()) as { + mode: string; + count: number; + elapsed_ms: number; + }; + + expect(body.mode).toBe("join"); + expect(body.count).toBe(512); + expect(body.elapsed_ms).toBeGreaterThanOrEqual(0); + }); + test("put", async () => { const resp = await mf.dispatchFetch(`${mfUrl}r2/put`, { method: "put", From f4d8306a83289a856bad20fe028cd5bede8c7fb2 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Fri, 10 Apr 2026 15:31:36 -0700 Subject: [PATCH 4/6] Format Rust code, add JS chunked benchmark JS results (1024 objects on local miniflare): sequential: 178ms parallel: 430ms chunked: 156ms Chunked (Promise.all in batches of 32) beats even sequential, confirming this is the recommended pattern in both JS and Rust. --- test/r2-bench/worker.js | 39 ++++++++++++++++----- test/src/r2.rs | 78 +++++++++++++++++++++++++++-------------- 2 files changed, 81 insertions(+), 36 deletions(-) diff --git a/test/r2-bench/worker.js b/test/r2-bench/worker.js index 7ee5d5111..cdcc6a7dd 100644 --- a/test/r2-bench/worker.js +++ b/test/r2-bench/worker.js @@ -2,6 +2,7 @@ // Run with: cd test/r2-bench && npx wrangler dev const COUNT = 1024; +const CHUNK_SIZE = 32; async function seed(bucket) { const existing = await bucket.head(`bench/key-0`); @@ -33,6 +34,21 @@ async function parallel(bucket) { return { mode: "parallel", count: values.length, elapsed_ms: Date.now() - start }; } +async function chunked(bucket) { + const start = Date.now(); + const values = []; + for (let offset = 0; offset < COUNT; offset += CHUNK_SIZE) { + const chunk = []; + for (let i = offset; i < Math.min(offset + CHUNK_SIZE, COUNT); i++) { + chunk.push( + bucket.get(`bench/key-${i}`).then((obj) => obj.text()) + ); + } + values.push(...await Promise.all(chunk)); + } + return { mode: "chunked", count: values.length, elapsed_ms: Date.now() - start }; +} + export default { async fetch(request, env) { const url = new URL(request.url); @@ -45,23 +61,28 @@ export default { if (url.pathname === "/sequential") { await seed(bucket); - const result = await sequential(bucket); - return Response.json(result); + return Response.json(await sequential(bucket)); } if (url.pathname === "/parallel") { await seed(bucket); - const result = await parallel(bucket); - return Response.json(result); + return Response.json(await parallel(bucket)); + } + + if (url.pathname === "/chunked") { + await seed(bucket); + return Response.json(await chunked(bucket)); } - if (url.pathname === "/both") { + if (url.pathname === "/all") { await seed(bucket); - const seq = await sequential(bucket); - const par = await parallel(bucket); - return Response.json({ sequential: seq, parallel: par }); + return Response.json({ + sequential: await sequential(bucket), + parallel: await parallel(bucket), + chunked: await chunked(bucket), + }); } - return new Response("GET /sequential, /parallel, or /both", { status: 404 }); + return new Response("GET /sequential, /parallel, /chunked, or /all", { status: 404 }); }, }; diff --git a/test/src/r2.rs b/test/src/r2.rs index 432dc1436..6d0232cab 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -172,7 +172,11 @@ pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result Result { +pub async fn get_many_sequential( + _req: Request, + env: Env, + _data: SomeSharedData, +) -> Result { let bucket = env.bucket("PUT_BUCKET")?; let keys = repro_keys(); seed_repro_keys(&bucket, &keys).await?; @@ -181,12 +185,19 @@ pub async fn get_many_sequential(_req: Request, env: Env, _data: SomeSharedData) let mut values = Vec::with_capacity(keys.len()); for (i, key) in keys.iter().enumerate() { let get_start = Date::now().as_millis(); - let object = bucket.get(key).execute().await?.expect("seeded object missing"); + let object = bucket + .get(key) + .execute() + .await? + .expect("seeded object missing"); let body = object.body().expect("seeded object body missing"); values.push(body.text().await?); let get_elapsed = Date::now().as_millis() - get_start; if i < 10 || i % 100 == 0 { - console_log!("[seq] key {i} started at {}ms, took {get_elapsed}ms", get_start - start); + console_log!( + "[seq] key {i} started at {}ms, took {get_elapsed}ms", + get_start - start + ); } } let elapsed_ms = (Date::now().as_millis() - start) as u64; @@ -234,7 +245,10 @@ pub async fn get_many_parallel(_req: Request, env: Env, _data: SomeSharedData) - .map_err(|e| JsValue::from_str(&e.to_string()))?; let get_elapsed = Date::now().as_millis() - get_start; if i < 10 || i % 100 == 0 { - console_log!("[par] key {i} completed at +{}ms, took {get_elapsed}ms", Date::now().as_millis() - outer_start); + console_log!( + "[par] key {i} completed at +{}ms, took {get_elapsed}ms", + Date::now().as_millis() - outer_start + ); } Ok(JsValue::from_str(&text)) }); @@ -285,7 +299,10 @@ pub async fn get_many_chunked(_req: Request, env: Env, _data: SomeSharedData) -> let text = body.text().await.expect("body text"); let get_elapsed = Date::now().as_millis() - get_start; if i < 10 || i % 100 == 0 { - console_log!("[chunk] key {i} completed at +{}ms, took {get_elapsed}ms", Date::now().as_millis() - start); + console_log!( + "[chunk] key {i} completed at +{}ms, took {get_elapsed}ms", + Date::now().as_millis() - start + ); } text } @@ -311,29 +328,36 @@ pub async fn get_many_join(_req: Request, env: Env, _data: SomeSharedData) -> Re seed_repro_keys(&bucket, &keys).await?; let start = Date::now().as_millis(); - let futs: Vec<_> = keys.iter().enumerate().map(|(i, key)| { - let bucket = bucket.clone(); - let key = key.clone(); - async move { - let get_start = Date::now().as_millis(); - if i < 10 || i % 100 == 0 { - console_log!("[join] key {i} started at +{}ms", get_start - start); - } - let object = bucket - .get(&key) - .execute() - .await - .expect("seeded object missing") - .expect("seeded object missing"); - let body = object.body().expect("seeded object body missing"); - let text = body.text().await.expect("body text"); - let get_elapsed = Date::now().as_millis() - get_start; - if i < 10 || i % 100 == 0 { - console_log!("[join] key {i} completed at +{}ms, took {get_elapsed}ms", Date::now().as_millis() - start); + let futs: Vec<_> = keys + .iter() + .enumerate() + .map(|(i, key)| { + let bucket = bucket.clone(); + let key = key.clone(); + async move { + let get_start = Date::now().as_millis(); + if i < 10 || i % 100 == 0 { + console_log!("[join] key {i} started at +{}ms", get_start - start); + } + let object = bucket + .get(&key) + .execute() + .await + .expect("seeded object missing") + .expect("seeded object missing"); + let body = object.body().expect("seeded object body missing"); + let text = body.text().await.expect("body text"); + let get_elapsed = Date::now().as_millis() - get_start; + if i < 10 || i % 100 == 0 { + console_log!( + "[join] key {i} completed at +{}ms, took {get_elapsed}ms", + Date::now().as_millis() - start + ); + } + text } - text - } - }).collect(); + }) + .collect(); let values = futures_util::future::join_all(futs).await; let elapsed_ms = (Date::now().as_millis() - start) as u64; From 251aa99dcb193386c4bd722d1ae07ff5a1385a14 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Mon, 20 Apr 2026 11:05:31 -0700 Subject: [PATCH 5/6] Use js_sys::futures::join_all for R2 multi-get benchmark Bumps wasm-bindgen to the js-futures-combinators branch head, which generalizes `IntoPromise` to accept any `Future>` where `E: Into`. That lets Rust `async fn`s returning `worker::Result` flow straight into `js_sys::futures::join_all` without manual `.map_err(Into::into)` at each call site. Refactors `get_many_join` to use the new combinator via a named `get_one_seeded` helper. With the signature pinning `E = worker::Error`, `?` and `Ok(...)` work inside the task body with no turbofish. Adds `get_many_chunked_js`: same 32-wide concurrency shape as `get_many_chunked`, but each window is awaited through `js_sys::futures::join_all` (which delegates to `Promise.all`) instead of `futures_util::stream::buffer_unordered` (which polls cooperatively in the Rust executor). This is the apples-to-apples comparison for whether the JS-native combinator buys us real parallel I/O. Adds `test/r2-perf.mjs`, a Miniflare-based harness that runs all four modes and reports median elapsed_ms. On local miniflare (512 objects): parallel 165ms (hand-rolled Promise.all) chunked 91ms (buffer_unordered(32)) chunked-js 83ms (js_sys join_all per 32) join 175ms (js_sys join_all over all 512) `chunked-js` is ~9% faster than `chunked` at the same concurrency width, supporting the hypothesis that cooperative Rust polling leaves some concurrency on the table versus delegating to the JS event loop. --- Cargo.lock | 36 +++++----- test/r2-perf.mjs | 159 +++++++++++++++++++++++++++++++++++++++++++++ test/src/r2.rs | 114 ++++++++++++++++++++++---------- test/src/router.rs | 1 + wasm-bindgen | 2 +- 5 files changed, 260 insertions(+), 52 deletions(-) create mode 100644 test/r2-perf.mjs diff --git a/Cargo.lock b/Cargo.lock index b1d537ba6..ccc7f9e63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,7 +82,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -93,7 +93,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -770,7 +770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1334,7 +1334,7 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.94" +version = "0.3.95" dependencies = [ "cfg-if", "futures-util", @@ -1545,7 +1545,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2085,7 +2085,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2511,7 +2511,7 @@ dependencies = [ "getrandom 0.4.1", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3262,7 +3262,7 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.117" +version = "0.2.118" dependencies = [ "cfg-if", "once_cell", @@ -3273,7 +3273,7 @@ dependencies = [ [[package]] name = "wasm-bindgen-cli-support" -version = "0.2.117" +version = "0.2.118" dependencies = [ "anyhow", "base64", @@ -3289,7 +3289,7 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.67" +version = "0.4.68" dependencies = [ "js-sys", "wasm-bindgen", @@ -3297,7 +3297,7 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.117" +version = "0.2.118" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3305,7 +3305,7 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.117" +version = "0.2.118" dependencies = [ "bumpalo", "proc-macro2", @@ -3316,14 +3316,14 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.117" +version = "0.2.118" dependencies = [ "unicode-ident", ] [[package]] name = "wasm-bindgen-test" -version = "0.3.67" +version = "0.3.68" dependencies = [ "async-trait", "cast", @@ -3343,7 +3343,7 @@ dependencies = [ [[package]] name = "wasm-bindgen-test-macro" -version = "0.3.67" +version = "0.3.68" dependencies = [ "proc-macro2", "quote", @@ -3352,7 +3352,7 @@ dependencies = [ [[package]] name = "wasm-bindgen-test-shared" -version = "0.2.117" +version = "0.2.118" [[package]] name = "wasm-encoder" @@ -3450,7 +3450,7 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.94" +version = "0.3.95" dependencies = [ "js-sys", "wasm-bindgen", @@ -3529,7 +3529,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/test/r2-perf.mjs b/test/r2-perf.mjs new file mode 100644 index 000000000..5057ea3d7 --- /dev/null +++ b/test/r2-perf.mjs @@ -0,0 +1,159 @@ +#!/usr/bin/env node +// +// Micro-benchmark for the R2 multi-get paths in the sandbox worker. +// +// Exercises each concurrency strategy N times under Miniflare and reports the +// median `elapsed_ms` reported by the worker itself, so we can compare: +// +// parallel — hand-rolled `Promise.all` over `future_to_promise`. +// chunked — `futures_util::stream::buffer_unordered(32)`. +// chunked-js — chunks of 32 awaited via `js_sys::futures::join_all`. +// join — single `js_sys::futures::join_all` over all 512 keys. +// +// Assumes the sandbox has been built (`test/build/index.js` exists and is +// up-to-date). Run with `node test/r2-perf.mjs` from the repo root. + +import { Miniflare, Response, createFetchMock } from "miniflare"; +import { writeFileSync } from "node:fs"; + +const ITERATIONS = Number(process.env.ITERATIONS ?? 10); +const WARMUP = Number(process.env.WARMUP ?? 2); +const MODES = [ + { path: "r2/get-many-parallel", label: "parallel" }, + { path: "r2/get-many-chunked", label: "chunked" }, + { path: "r2/get-many-chunked-js", label: "chunked-js" }, + { path: "r2/get-many-join", label: "join" }, +]; + +// Same shape as test/tests/mf.ts — kept minimal because r2-perf only needs the +// R2 buckets; the rest of the sandbox bindings are tolerated as-is because the +// worker only reads `PUT_BUCKET` on these routes. +const mf = new Miniflare({ + d1Persist: false, + kvPersist: false, + r2Persist: false, + cachePersist: false, + workers: [ + { + scriptPath: "./test/build/index.js", + compatibilityDate: "2025-07-24", + cache: true, + d1Databases: ["DB"], + modules: true, + modulesRules: [ + { type: "CompiledWasm", include: ["**/*.wasm"], fallthrough: true }, + ], + bindings: { + EXAMPLE_SECRET: "example", + SOME_SECRET: "secret!", + SOME_VARIABLE: "some value", + SOME_OBJECT_VARIABLE: { foo: 42, bar: "string" }, + }, + durableObjects: { + COUNTER: "Counter", + PUT_RAW_TEST_OBJECT: "PutRawTestObject", + AUTO: "AutoResponseObject", + MY_CLASS: "MyClass", + SQL_COUNTER: { className: "SqlCounter", useSQLite: true }, + SQL_ITERATOR: { className: "SqlIterator", useSQLite: true }, + }, + kvNamespaces: ["SOME_NAMESPACE", "FILE_SIZES", "TEST"], + serviceBindings: { + async remote() { + return new Response("hello world"); + }, + }, + r2Buckets: ["EMPTY_BUCKET", "PUT_BUCKET", "SEEDED_BUCKET", "DELETE_BUCKET"], + queueConsumers: { my_queue: { maxBatchTimeout: 1 } }, + queueProducers: ["my_queue", "my_queue"], + fetchMock: createFetchMock(), + secretsStoreSecrets: { + SECRETS: { store_id: "SECRET_STORE", secret_name: "secret-name" }, + MISSING_SECRET: { + store_id: "SECRET_STORE_MISSING", + secret_name: "missing-secret", + }, + }, + wrappedBindings: { + HTTP_ANALYTICS: { scriptName: "mini-analytics-engine" }, + }, + ratelimits: { + TEST_RATE_LIMITER: { simple: { limit: 10, period: 60 } }, + }, + }, + { + name: "mini-analytics-engine", + modules: true, + script: `export default function (env) { + return { + writeDataPoint(data) { + console.log(data) + } + } + }`, + }, + ], +}); + +await (await mf.getSecretsStoreSecretAPI("SECRETS"))().create("secret value"); + +const mfUrl = await mf.ready; +console.log(`✅ Miniflare ready at ${mfUrl}`); +console.log(` iterations=${ITERATIONS} warmup=${WARMUP}\n`); + +const results = {}; + +for (const mode of MODES) { + // Warmup — first hit pays the one-shot R2 seeding cost in the worker. + for (let i = 0; i < WARMUP; i++) { + const resp = await mf.dispatchFetch(`${mfUrl}${mode.path}`); + if (resp.status !== 200) { + console.error(`❌ ${mode.label} warmup failed: ${resp.status}`); + await mf.dispose(); + process.exit(1); + } + await resp.json(); + } + + const samples = []; + for (let i = 0; i < ITERATIONS; i++) { + const resp = await mf.dispatchFetch(`${mfUrl}${mode.path}`); + if (resp.status !== 200) { + console.error(`❌ ${mode.label} iter ${i} failed: ${resp.status}`); + await mf.dispose(); + process.exit(1); + } + const body = await resp.json(); + samples.push(body.elapsed_ms); + } + + samples.sort((a, b) => a - b); + const median = samples[Math.floor(samples.length / 2)]; + const min = samples[0]; + const max = samples[samples.length - 1]; + const mean = samples.reduce((a, b) => a + b, 0) / samples.length; + + results[mode.label] = { samples, median, min, max, mean }; + console.log( + `${mode.label.padEnd(12)} median=${median}ms mean=${mean.toFixed(1)}ms min=${min}ms max=${max}ms samples=[${samples.join(", ")}]`, + ); +} + +console.log(); +console.log("━".repeat(60)); +console.log("Relative to `parallel` (hand-rolled Promise.all):"); +const baseline = results.parallel.median; +for (const mode of MODES) { + const r = results[mode.label]; + const ratio = r.median / baseline; + console.log(` ${mode.label.padEnd(12)} ${ratio.toFixed(2)}× (${r.median}ms)`); +} +console.log("━".repeat(60)); + +const out = process.env.PERF_RESULT; +if (out) { + writeFileSync(out, JSON.stringify(results, null, 2)); + console.log(`\n📁 Results written to ${out}`); +} + +await mf.dispose(); diff --git a/test/src/r2.rs b/test/src/r2.rs index 6d0232cab..b95529bff 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -7,7 +7,7 @@ use std::{ }; use worker::{ console_log, - js_sys::{Array, Promise}, + js_sys::{Array, JsString, Promise}, wasm_bindgen::JsValue, wasm_bindgen_futures::{future_to_promise, JsFuture}, Bucket, Conditional, Data, Date, Env, FixedLengthStream, HttpMetadata, Include, Request, @@ -321,6 +321,37 @@ pub async fn get_many_chunked(_req: Request, env: Env, _data: SomeSharedData) -> }) } +// Fetches one seeded key. Named so the return type pins `E = worker::Error`, +// which lets the body use `?` and a bare `Ok(...)` with no turbofish. +async fn get_one_seeded( + bucket: Bucket, + key: String, + i: usize, + start: u64, +) -> Result { + let get_start = Date::now().as_millis(); + if i < 10 || i % 100 == 0 { + console_log!("[join] key {i} started at +{}ms", get_start - start); + } + let object = bucket + .get(&key) + .execute() + .await? + .ok_or_else(|| worker::Error::RustError("seeded object missing".into()))?; + let body = object + .body() + .ok_or_else(|| worker::Error::RustError("seeded object body missing".into()))?; + let text = body.text().await?; + let get_elapsed = Date::now().as_millis() - get_start; + if i < 10 || i % 100 == 0 { + console_log!( + "[join] key {i} completed at +{}ms, took {get_elapsed}ms", + Date::now().as_millis() - start + ); + } + Ok(JsString::from(text)) +} + #[worker::send] pub async fn get_many_join(_req: Request, env: Env, _data: SomeSharedData) -> Result { let bucket = env.bucket("PUT_BUCKET")?; @@ -328,45 +359,62 @@ pub async fn get_many_join(_req: Request, env: Env, _data: SomeSharedData) -> Re seed_repro_keys(&bucket, &keys).await?; let start = Date::now().as_millis(); - let futs: Vec<_> = keys + let futs = keys .iter() .enumerate() - .map(|(i, key)| { - let bucket = bucket.clone(); - let key = key.clone(); - async move { - let get_start = Date::now().as_millis(); - if i < 10 || i % 100 == 0 { - console_log!("[join] key {i} started at +{}ms", get_start - start); - } - let object = bucket - .get(&key) - .execute() - .await - .expect("seeded object missing") - .expect("seeded object missing"); - let body = object.body().expect("seeded object body missing"); - let text = body.text().await.expect("body text"); - let get_elapsed = Date::now().as_millis() - get_start; - if i < 10 || i % 100 == 0 { - console_log!( - "[join] key {i} completed at +{}ms, took {get_elapsed}ms", - Date::now().as_millis() - start - ); - } - text - } - }) - .collect(); - - let values = futures_util::future::join_all(futs).await; + .map(|(i, key)| get_one_seeded(bucket.clone(), key.clone(), i, start)); + + // `join_all` delegates to `Promise.all` on the JS side so completions can + // interleave via the event loop instead of being serialized through the + // Rust executor. The broadened `IntoPromise` impl lifts each + // `worker::Result` into a `Promise` for us, so the + // call site stays free of manual `.map_err(Into::into)`. + let values = worker::js_sys::futures::join_all(futs).await?; let elapsed_ms = (Date::now().as_millis() - start) as u64; - assert_eq!(values.len(), REPRO_OBJECT_COUNT); + assert_eq!(values.length() as usize, REPRO_OBJECT_COUNT); Response::from_json(&MultiGetTiming { mode: "join", - count: values.len(), + count: values.length() as usize, + elapsed_ms, + }) +} + +/// Same concurrency shape as `get_many_chunked` (32-wide windows over the +/// input keys) but each window is awaited through `worker::js_sys::futures::join_all`, +/// which delegates to `Promise.all` on the JS side. This is the apples-to-apples +/// comparison for whether the JS-native combinator actually buys us parallel +/// I/O versus `futures_util::stream::buffer_unordered`, which polls the +/// sub-futures cooperatively inside the Rust executor and cannot yield to the +/// JS event loop between individual completions. +#[worker::send] +pub async fn get_many_chunked_js( + _req: Request, + env: Env, + _data: SomeSharedData, +) -> Result { + let bucket = env.bucket("PUT_BUCKET")?; + let keys = repro_keys(); + seed_repro_keys(&bucket, &keys).await?; + + let start = Date::now().as_millis(); + let mut total = 0usize; + for (chunk_idx, chunk) in keys.chunks(CHUNK_SIZE).enumerate() { + let base = chunk_idx * CHUNK_SIZE; + let futs = chunk.iter().enumerate().map(|(offset, key)| { + get_one_seeded(bucket.clone(), key.clone(), base + offset, start) + }); + let values = worker::js_sys::futures::join_all(futs).await?; + total += values.length() as usize; + } + let elapsed_ms = (Date::now().as_millis() - start) as u64; + + assert_eq!(total, REPRO_OBJECT_COUNT); + + Response::from_json(&MultiGetTiming { + mode: "chunked-js", + count: total, elapsed_ms, }) } diff --git a/test/src/router.rs b/test/src/router.rs index e23176d99..9569748f0 100644 --- a/test/src/router.rs +++ b/test/src/router.rs @@ -213,6 +213,7 @@ macro_rules! add_routes ( add_route!($obj, get, "/r2/get-many-sequential", r2::get_many_sequential); add_route!($obj, get, "/r2/get-many-parallel", r2::get_many_parallel); add_route!($obj, get, "/r2/get-many-chunked", r2::get_many_chunked); + add_route!($obj, get, "/r2/get-many-chunked-js", r2::get_many_chunked_js); add_route!($obj, get, "/r2/get-many-join", r2::get_many_join); add_route!($obj, put, "/r2/put", r2::put); add_route!($obj, put, "/r2/put-properties", r2::put_properties); diff --git a/wasm-bindgen b/wasm-bindgen index 6640596c1..7466c28a3 160000 --- a/wasm-bindgen +++ b/wasm-bindgen @@ -1 +1 @@ -Subproject commit 6640596c174b1e36abffeeb29efd4e828b236d68 +Subproject commit 7466c28a35f5dd241802275cc8aa210fbff6aa28 From 9a9f70eaa6b5d03532d400451d60f2a21315db85 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Mon, 20 Apr 2026 12:08:23 -0700 Subject: [PATCH 6/6] Adopt renamed js_sys::futures combinator names Bumps wasm-bindgen submodule to pick up the final naming pass: the fail-fast combinator is now `try_join_all` (was `join_all`), aligned 1:1 with `futures_util`. The R2 multi-get benchmark and the perf harness move to the new name. Also broadens the `r2-perf.mjs` harness to include the `sequential` mode as the baseline, since that was the original finding the PR was about (chunked vs sequential). --- test/r2-perf.mjs | 5 +++-- test/src/r2.rs | 19 ++++++++++--------- wasm-bindgen | 2 +- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/test/r2-perf.mjs b/test/r2-perf.mjs index 5057ea3d7..b8a0c8f58 100644 --- a/test/r2-perf.mjs +++ b/test/r2-perf.mjs @@ -19,6 +19,7 @@ import { writeFileSync } from "node:fs"; const ITERATIONS = Number(process.env.ITERATIONS ?? 10); const WARMUP = Number(process.env.WARMUP ?? 2); const MODES = [ + { path: "r2/get-many-sequential", label: "sequential" }, { path: "r2/get-many-parallel", label: "parallel" }, { path: "r2/get-many-chunked", label: "chunked" }, { path: "r2/get-many-chunked-js", label: "chunked-js" }, @@ -141,8 +142,8 @@ for (const mode of MODES) { console.log(); console.log("━".repeat(60)); -console.log("Relative to `parallel` (hand-rolled Promise.all):"); -const baseline = results.parallel.median; +console.log("Relative to `sequential` (serial R2 gets):"); +const baseline = results.sequential.median; for (const mode of MODES) { const r = results[mode.label]; const ratio = r.median / baseline; diff --git a/test/src/r2.rs b/test/src/r2.rs index b95529bff..576e9d04f 100644 --- a/test/src/r2.rs +++ b/test/src/r2.rs @@ -364,25 +364,26 @@ pub async fn get_many_join(_req: Request, env: Env, _data: SomeSharedData) -> Re .enumerate() .map(|(i, key)| get_one_seeded(bucket.clone(), key.clone(), i, start)); - // `join_all` delegates to `Promise.all` on the JS side so completions can - // interleave via the event loop instead of being serialized through the - // Rust executor. The broadened `IntoPromise` impl lifts each + // `try_join_all` delegates to `Promise.all` on the JS side so completions + // can interleave via the event loop instead of being serialized through + // the Rust executor. The broadened `IntoPromise` impl lifts each // `worker::Result` into a `Promise` for us, so the // call site stays free of manual `.map_err(Into::into)`. - let values = worker::js_sys::futures::join_all(futs).await?; + let values = worker::js_sys::futures::try_join_all(futs).await?; let elapsed_ms = (Date::now().as_millis() - start) as u64; - assert_eq!(values.length() as usize, REPRO_OBJECT_COUNT); + let count = values.iter().len(); + assert_eq!(count, REPRO_OBJECT_COUNT); Response::from_json(&MultiGetTiming { mode: "join", - count: values.length() as usize, + count, elapsed_ms, }) } /// Same concurrency shape as `get_many_chunked` (32-wide windows over the -/// input keys) but each window is awaited through `worker::js_sys::futures::join_all`, +/// input keys) but each window is awaited through `worker::js_sys::futures::try_join_all`, /// which delegates to `Promise.all` on the JS side. This is the apples-to-apples /// comparison for whether the JS-native combinator actually buys us parallel /// I/O versus `futures_util::stream::buffer_unordered`, which polls the @@ -405,8 +406,8 @@ pub async fn get_many_chunked_js( let futs = chunk.iter().enumerate().map(|(offset, key)| { get_one_seeded(bucket.clone(), key.clone(), base + offset, start) }); - let values = worker::js_sys::futures::join_all(futs).await?; - total += values.length() as usize; + let values = worker::js_sys::futures::try_join_all(futs).await?; + total += values.iter().len(); } let elapsed_ms = (Date::now().as_millis() - start) as u64; diff --git a/wasm-bindgen b/wasm-bindgen index 7466c28a3..b35e3472d 160000 --- a/wasm-bindgen +++ b/wasm-bindgen @@ -1 +1 @@ -Subproject commit 7466c28a35f5dd241802275cc8aa210fbff6aa28 +Subproject commit b35e3472d3e9f200a18575ab2c096b95b752ad6a