Skip to content

Commit 02f0939

Browse files
committed
Add --parallel option for latency test
1 parent cf67743 commit 02f0939

4 files changed

Lines changed: 97 additions & 40 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "openworkers-cli"
3-
version = "0.3.5"
3+
version = "0.3.6"
44
edition = "2024"
55
license = "MIT"
66
description = "CLI for OpenWorkers - Self-hosted Cloudflare Workers runtime"

src/commands/latency.rs

Lines changed: 87 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::config::{AliasConfig, Config, ConfigError};
22
use colored::Colorize;
3+
use futures::stream::{self, StreamExt};
34
use sqlx::postgres::PgPoolOptions;
5+
use std::sync::Arc;
46
use std::time::{Duration, Instant};
57
use tokio::net::TcpStream;
68
use url::Url;
@@ -82,23 +84,24 @@ pub async fn run(
8284
alias: Option<String>,
8385
connect: bool,
8486
count: usize,
87+
parallel: usize,
8588
timeout: u64,
8689
) -> Result<(), LatencyError> {
8790
let (alias_name, alias_config) = resolve_alias(&alias)?;
8891

8992
match alias_config {
9093
AliasConfig::Db { database_url, .. } => {
9194
if connect {
92-
run_db_connect(&alias_name, &database_url, count, timeout).await
95+
run_db_connect(&alias_name, &database_url, count, parallel, timeout).await
9396
} else {
94-
run_db_query(&alias_name, &database_url, count, timeout).await
97+
run_db_query(&alias_name, &database_url, count, parallel, timeout).await
9598
}
9699
}
97100
AliasConfig::Api { url, insecure, .. } => {
98101
if connect {
99-
run_http_connect(&alias_name, &url, count, timeout).await
102+
run_http_connect(&alias_name, &url, count, parallel, timeout).await
100103
} else {
101-
run_http_reuse(&alias_name, &url, insecure, count, timeout).await
104+
run_http_reuse(&alias_name, &url, insecure, count, parallel, timeout).await
102105
}
103106
}
104107
}
@@ -110,6 +113,7 @@ async fn run_db_query(
110113
alias_name: &str,
111114
database_url: &str,
112115
count: usize,
116+
parallel: usize,
113117
timeout: u64,
114118
) -> Result<(), LatencyError> {
115119
let (host, port) = parse_host_port(database_url)?;
@@ -123,7 +127,7 @@ async fn run_db_query(
123127
);
124128

125129
let pool = PgPoolOptions::new()
126-
.max_connections(1)
130+
.max_connections(parallel as u32)
127131
.acquire_timeout(Duration::from_secs(timeout))
128132
.connect(database_url)
129133
.await?;
@@ -136,13 +140,25 @@ async fn run_db_query(
136140

137141
let mut latencies = Vec::with_capacity(count);
138142

139-
for i in 1..=count {
140-
let start = Instant::now();
141-
let result: Result<i32, _> = sqlx::query_scalar("SELECT 1").fetch_one(&pool).await;
142-
let elapsed = start.elapsed();
143+
let mut results: Vec<_> = stream::iter(1..=count)
144+
.map(|i| {
145+
let pool = pool.clone();
146+
async move {
147+
let start = Instant::now();
148+
let result: Result<i32, _> = sqlx::query_scalar("SELECT 1").fetch_one(&pool).await;
149+
let elapsed = start.elapsed();
150+
(i, result.map(|_| elapsed))
151+
}
152+
})
153+
.buffer_unordered(parallel)
154+
.collect()
155+
.await;
156+
157+
results.sort_by_key(|(i, _)| *i);
143158

159+
for (i, result) in &results {
144160
match result {
145-
Ok(_) => {
161+
Ok(elapsed) => {
146162
let ms = elapsed.as_secs_f64() * 1000.0;
147163
latencies.push(ms);
148164
println!(" {} {}/{}: {:.2} ms", "✓".green(), i, count, ms);
@@ -175,10 +191,11 @@ async fn run_db_connect(
175191
alias_name: &str,
176192
database_url: &str,
177193
count: usize,
194+
parallel: usize,
178195
timeout: u64,
179196
) -> Result<(), LatencyError> {
180197
let (host, port) = parse_host_port(database_url)?;
181-
let addr = format!("{}:{}", host, port);
198+
let addr: Arc<str> = format!("{}:{}", host, port).into();
182199
let timeout_dur = Duration::from_secs(timeout);
183200

184201
println!(
@@ -192,11 +209,24 @@ async fn run_db_connect(
192209

193210
let mut latencies = Vec::with_capacity(count);
194211

195-
for i in 1..=count {
196-
let start = Instant::now();
197-
let result = tokio::time::timeout(timeout_dur, TcpStream::connect(&addr)).await;
198-
let elapsed = start.elapsed();
212+
let mut results: Vec<_> = stream::iter(1..=count)
213+
.map(|i| {
214+
let addr = addr.clone();
215+
async move {
216+
let start = Instant::now();
217+
let result =
218+
tokio::time::timeout(timeout_dur, TcpStream::connect(addr.as_ref())).await;
219+
let elapsed = start.elapsed();
220+
(i, result, elapsed)
221+
}
222+
})
223+
.buffer_unordered(parallel)
224+
.collect()
225+
.await;
226+
227+
results.sort_by_key(|(i, _, _)| *i);
199228

229+
for (i, result, elapsed) in &results {
200230
match result {
201231
Ok(Ok(_)) => {
202232
let ms = elapsed.as_secs_f64() * 1000.0;
@@ -216,10 +246,6 @@ async fn run_db_connect(
216246
println!(" {} {}/{}: {}", "✗".red(), i, count, "timeout".dimmed());
217247
}
218248
}
219-
220-
if i < count {
221-
tokio::time::sleep(Duration::from_millis(100)).await;
222-
}
223249
}
224250

225251
println!();
@@ -270,6 +296,7 @@ async fn run_http_reuse(
270296
url: &str,
271297
insecure: bool,
272298
count: usize,
299+
parallel: usize,
273300
timeout: u64,
274301
) -> Result<(), LatencyError> {
275302
let (host, _) = parse_host_port(url)?;
@@ -312,31 +339,45 @@ async fn run_http_reuse(
312339
let mut any_success = false;
313340

314341
for layer in LAYERS {
315-
let endpoint = latency_url(url, layer.path);
342+
let endpoint: Arc<str> = latency_url(url, layer.path).into();
316343

317344
println!();
318345
println!("{}:", layer.label.bold());
319346

320347
let mut latencies = Vec::with_capacity(count);
321348

322-
for i in 1..=count {
323-
let start = Instant::now();
324-
let result = client.get(&endpoint).send().await;
325-
let elapsed = start.elapsed();
349+
let mut results: Vec<_> = stream::iter(1..=count)
350+
.map(|i| {
351+
let client = client.clone();
352+
let endpoint = endpoint.clone();
353+
async move {
354+
let start = Instant::now();
355+
let result = client.get(endpoint.as_ref()).send().await;
356+
let elapsed = start.elapsed();
357+
(i, result, elapsed)
358+
}
359+
})
360+
.buffer_unordered(parallel)
361+
.collect()
362+
.await;
363+
364+
results.sort_by_key(|(i, _, _)| *i);
365+
366+
let mut not_configured = false;
326367

368+
for (i, result, elapsed) in results {
327369
match result {
328370
Ok(resp) if resp.status().as_u16() == 418 => {
329371
let _ = resp.bytes().await;
330372

331-
if i == 1 {
373+
if !not_configured {
374+
not_configured = true;
332375
println!(
333376
" {} {} (not configured)",
334377
"─".dimmed(),
335378
layer.name.dimmed()
336379
);
337380
}
338-
339-
break;
340381
}
341382
Ok(resp) if resp.status().is_success() => {
342383
let _ = resp.bytes().await;
@@ -400,10 +441,11 @@ async fn run_http_connect(
400441
alias_name: &str,
401442
url: &str,
402443
count: usize,
444+
parallel: usize,
403445
timeout: u64,
404446
) -> Result<(), LatencyError> {
405447
let (host, port) = parse_host_port(url)?;
406-
let addr = format!("{}:{}", host, port);
448+
let addr: Arc<str> = format!("{}:{}", host, port).into();
407449
let timeout_dur = Duration::from_secs(timeout);
408450

409451
println!(
@@ -417,11 +459,24 @@ async fn run_http_connect(
417459

418460
let mut latencies = Vec::with_capacity(count);
419461

420-
for i in 1..=count {
421-
let start = Instant::now();
422-
let result = tokio::time::timeout(timeout_dur, TcpStream::connect(&addr)).await;
423-
let elapsed = start.elapsed();
462+
let mut results: Vec<_> = stream::iter(1..=count)
463+
.map(|i| {
464+
let addr = addr.clone();
465+
async move {
466+
let start = Instant::now();
467+
let result =
468+
tokio::time::timeout(timeout_dur, TcpStream::connect(addr.as_ref())).await;
469+
let elapsed = start.elapsed();
470+
(i, result, elapsed)
471+
}
472+
})
473+
.buffer_unordered(parallel)
474+
.collect()
475+
.await;
424476

477+
results.sort_by_key(|(i, _, _)| *i);
478+
479+
for (i, result, elapsed) in &results {
425480
match result {
426481
Ok(Ok(_)) => {
427482
let ms = elapsed.as_secs_f64() * 1000.0;
@@ -441,10 +496,6 @@ async fn run_http_connect(
441496
println!(" {} {}/{}: {}", "✗".red(), i, count, "timeout".dimmed());
442497
}
443498
}
444-
445-
if i < count {
446-
tokio::time::sleep(Duration::from_millis(100)).await;
447-
}
448499
}
449500

450501
println!();

src/main.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ enum Commands {
237237
#[command(after_help = "Examples:\n \
238238
ow test-latency Test request latency (reuses connection)\n \
239239
ow test-latency --connect Test connection latency (new connection each time)\n \
240-
ow local test-latency -n 20 Test with 20 iterations")]
240+
ow local test-latency -n 20 Test with 20 iterations\n \
241+
ow test-latency -p 5 Test with 5 parallel requests")]
241242
TestLatency {
242243
/// Test connection latency instead of request latency (new connection each time)
243244
#[arg(short, long)]
@@ -247,6 +248,10 @@ enum Commands {
247248
#[arg(short = 'n', long, default_value = "10")]
248249
count: usize,
249250

251+
/// Number of parallel requests (default: 1)
252+
#[arg(short, long, default_value = "1")]
253+
parallel: usize,
254+
250255
/// Timeout in seconds (default: 5)
251256
#[arg(short, long, default_value = "5")]
252257
timeout: u64,
@@ -668,8 +673,9 @@ async fn main() {
668673
Commands::TestLatency {
669674
connect,
670675
count,
676+
parallel,
671677
timeout,
672-
} => commands::latency::run(alias, connect, count, timeout)
678+
} => commands::latency::run(alias, connect, count, parallel, timeout)
673679
.await
674680
.map_err(|e| e.to_string()),
675681
Commands::SetupStorage {

0 commit comments

Comments
 (0)