Skip to content

Commit e92b011

Browse files
authored
test(pubsub): add throughput benchmark (#5238)
Add a new throughput benchmark for measuring the performance of publish and subscribe operations. The benchmark is implemented as a single binary with subcommands for the publisher and subscriber, allowing for unified metric formatting and simplified execution. A version of this was used to benchmark our libraries to validate acceptable performance. For #3840
1 parent 27f04b8 commit e92b011

8 files changed

Lines changed: 676 additions & 95 deletions

File tree

Cargo.lock

Lines changed: 108 additions & 95 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ members = [
287287
"src/generated/type",
288288
"src/lro",
289289
"src/pubsub",
290+
"src/pubsub/benchmarks/throughput",
290291
"src/pubsub/examples",
291292
"src/spanner",
292293
"src/spanner/grpc-mock",
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
[package]
16+
name = "pubsub-throughput"
17+
version = "0.0.0"
18+
publish = false
19+
# Inherit other attributes from the workspace.
20+
edition.workspace = true
21+
authors.workspace = true
22+
license.workspace = true
23+
repository.workspace = true
24+
25+
[lints]
26+
workspace = true
27+
28+
[dependencies]
29+
anyhow.workspace = true
30+
bytes.workspace = true
31+
clap = { workspace = true, features = ["derive", "env", "help", "std"] }
32+
google-cloud-pubsub = { workspace = true, features = ["default-rustls-provider"] }
33+
futures.workspace = true
34+
humantime.workspace = true
35+
rand.workspace = true
36+
tokio = { workspace = true, features = ["full"] }
37+
tracing.workspace = true
38+
tracing-subscriber = { workspace = true, features = ["fmt", "std"] }
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Pub/Sub Throughput Benchmark
2+
3+
A throughput benchmark for the Cloud Pub/Sub Rust client library.
4+
5+
This tool measures the performance of publishing messages to a Google Cloud
6+
Pub/Sub topic, or receiving messages from a subscription. It reports operation
7+
rates in messages per second and megabytes per second.
8+
9+
## Usage
10+
11+
```bash
12+
cargo run --release -p pubsub-throughput -- [COMMAND] [OPTIONS]
13+
```
14+
15+
To see the commands and options use:
16+
17+
```bash
18+
cargo run -p pubsub-throughput -- --help
19+
```
20+
21+
## Output Format
22+
23+
The benchmark outputs data in CSV format with the following columns:
24+
25+
- `timestamp`: The Unix timestamp in milliseconds.
26+
- `elapsed(s)`: The elapsed time for the operation in seconds.
27+
- `op`: The operation being measured (`Pub`, `Ack`, or `Recv`).
28+
- `iteration`: The current iteration number.
29+
- `count`: The number of messages processed in the operation.
30+
- `msgs/s`: The number of messages per second.
31+
- `bytes`: The total number of bytes processed.
32+
- `MB/s`: The throughput in megabytes per second.
33+
- `errors`: The number of errors encountered (if any).
34+
- `errors/s`: The number of errors per second (if any).
35+
36+
## Examples
37+
38+
### Setup
39+
40+
Create a topic and subscription:
41+
42+
```bash
43+
gcloud pubsub topics create my-topic
44+
gcloud pubsub subscriptions create my-subscription --topic=my-topic
45+
```
46+
47+
### Publisher Run
48+
49+
```bash
50+
cargo run --release -p pubsub-throughput -- publisher \
51+
--project my-gcp-project \
52+
--topic-id my-topic \
53+
--payload-size 2048 \
54+
--report-interval 10s \
55+
--duration 1m
56+
```
57+
58+
### Subscriber Run
59+
60+
```bash
61+
cargo run --release -p pubsub-throughput -- subscriber \
62+
--project my-gcp-project \
63+
--subscription-id my-subscription \
64+
--report-interval 10s \
65+
--duration 1m
66+
```
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use clap::{Args, Parser, Subcommand};
16+
use humantime::parse_duration;
17+
18+
#[derive(Parser, Debug)]
19+
#[command(author, version, about = "Cloud Pub/Sub Throughput Benchmark", long_about = None)]
20+
pub struct Config {
21+
#[command(subcommand)]
22+
pub command: Commands,
23+
}
24+
25+
#[derive(Subcommand, Debug)]
26+
pub enum Commands {
27+
/// Run the publisher benchmark
28+
Publisher(PublisherArgs),
29+
/// Run the subscriber benchmark
30+
Subscriber(SubscriberArgs),
31+
}
32+
33+
#[derive(Args, Debug, Clone)]
34+
pub struct CommonArgs {
35+
#[arg(long, default_value = "", env = "GOOGLE_CLOUD_PROJECT")]
36+
pub project: String,
37+
38+
#[arg(long, value_parser = parse_duration, default_value = "5s")]
39+
pub report_interval: std::time::Duration,
40+
41+
#[arg(long, value_parser = parse_duration, default_value = "5m")]
42+
pub duration: std::time::Duration,
43+
44+
#[arg(long, default_value_t = 1)]
45+
pub grpc_channels: usize,
46+
47+
#[arg(long, default_value_t = 100000)]
48+
pub max_outstanding_messages: usize,
49+
}
50+
51+
#[derive(Args, Debug, Clone)]
52+
pub struct PublisherArgs {
53+
#[command(flatten)]
54+
pub common: CommonArgs,
55+
56+
#[arg(long)]
57+
pub topic_id: String,
58+
59+
#[arg(long, default_value_t = 1024)]
60+
pub payload_size: i64,
61+
62+
#[arg(long, default_value_t = 1000)]
63+
pub batch_size: u32,
64+
65+
#[arg(long, default_value_t = 10 * 1024 * 1024)] // 10 MB
66+
pub batch_bytes: u32,
67+
68+
#[arg(long, value_parser = parse_duration, default_value = "100ms")]
69+
pub batch_delay: std::time::Duration,
70+
}
71+
72+
#[derive(Args, Debug, Clone)]
73+
pub struct SubscriberArgs {
74+
#[command(flatten)]
75+
pub common: CommonArgs,
76+
77+
#[arg(long)]
78+
pub subscription_id: String,
79+
80+
#[arg(long, default_value_t = 1)]
81+
pub streams: usize,
82+
}
83+
84+
pub fn parse_args() -> Config {
85+
Config::parse()
86+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
16+
17+
mod args;
18+
mod publisher;
19+
mod subscriber;
20+
21+
#[tokio::main]
22+
async fn main() -> Result<(), anyhow::Error> {
23+
let config = crate::args::parse_args();
24+
/// The CSV header used for all throughput benchmark results.
25+
const CSV_HEADER: &str =
26+
"timestamp,elapsed(s),op,iteration,count,msgs/s,bytes,MB/s,errors,errors/s";
27+
28+
match config.command {
29+
crate::args::Commands::Publisher(args) => {
30+
println!("# Running publish benchmark with config: {:?}", args);
31+
println!("{}", CSV_HEADER);
32+
publisher::run(args).await?;
33+
}
34+
crate::args::Commands::Subscriber(args) => {
35+
println!("# Running subscribe benchmark with config: {:?}", args);
36+
println!("{}", CSV_HEADER);
37+
subscriber::run(args).await?;
38+
}
39+
}
40+
41+
Ok(())
42+
}
43+
44+
/// Returns true if the benchmark has exceeded its maximum allowed runtime.
45+
pub(crate) fn done(maximum_runtime: Duration, start: Instant) -> bool {
46+
start.elapsed() >= maximum_runtime
47+
}
48+
49+
/// Returns the current Unix timestamp in milliseconds.
50+
pub(crate) fn timestamp() -> u128 {
51+
SystemTime::now()
52+
.duration_since(UNIX_EPOCH)
53+
.unwrap()
54+
.as_millis()
55+
}
56+
57+
/// Formats and prints a single measurement row to stdout in CSV format.
58+
///
59+
/// The output includes throughput metrics (msgs/s and MB/s) calculated based
60+
/// on the elapsed time and the amount of data processed.
61+
pub(crate) fn print_result(
62+
operation: &str,
63+
iteration: i64,
64+
count: i64,
65+
bytes: i64,
66+
errors: i64,
67+
elapsed: Duration,
68+
) {
69+
let elapsed_s = elapsed.as_secs_f64();
70+
let mbs = (bytes as f64) / elapsed_s / 1_000_000.0;
71+
let msgs = (count as f64) / elapsed_s;
72+
let errs = (errors as f64) / elapsed_s;
73+
println!(
74+
"{},{},{},{},{},{:.2},{},{:.2},{},{:.2}",
75+
timestamp(),
76+
elapsed_s,
77+
operation,
78+
iteration,
79+
count,
80+
msgs,
81+
bytes,
82+
mbs,
83+
errors,
84+
errs
85+
);
86+
}

0 commit comments

Comments
 (0)