Skip to content

Commit cbd582b

Browse files
committed
respond to review
1 parent 10cca11 commit cbd582b

3 files changed

Lines changed: 43 additions & 44 deletions

File tree

src/pubsub/benchmarks/throughput/src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ async fn main() -> Result<(), anyhow::Error> {
4343

4444
/// Returns true if the benchmark has exceeded its maximum allowed runtime.
4545
pub(crate) fn done(maximum_runtime: Duration, start: Instant) -> bool {
46-
let now = Instant::now();
47-
now >= start + maximum_runtime
46+
start.elapsed() >= maximum_runtime
4847
}
4948

5049
/// Returns the current Unix timestamp in milliseconds.

src/pubsub/benchmarks/throughput/src/publisher.rs

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ struct Stats {
2929
send_count: AtomicI64,
3030
/// Total bytes sent to the client library.
3131
send_bytes: AtomicI64,
32-
/// Total messages successfully acknowledged by the Pub/Sub service.
33-
ack_count: AtomicI64,
34-
/// Total bytes successfully acknowledged by the Pub/Sub service.
35-
ack_bytes: AtomicI64,
32+
/// Total messages successfully received by the Pub/Sub service.
33+
recv_count: AtomicI64,
34+
/// Total bytes successfully received by the Pub/Sub service.
35+
recv_bytes: AtomicI64,
3636
/// Total number of publishing errors encountered.
3737
error_count: AtomicI64,
3838
}
@@ -44,29 +44,24 @@ pub async fn run(config: PublisherArgs) -> Result<(), anyhow::Error> {
4444
config.common.project, config.topic_id
4545
);
4646

47-
run_publisher(Arc::new(config.clone()), topic_name.clone()).await;
48-
49-
Ok(())
50-
}
51-
52-
/// Creates a Publisher client configured with the benchmark settings.
53-
async fn create_publisher(config: Arc<PublisherArgs>, topic_name: String) -> Publisher {
54-
Publisher::builder(topic_name)
47+
let publisher = Publisher::builder(topic_name)
5548
.set_byte_threshold(config.batch_bytes)
5649
.set_message_count_threshold(config.batch_size)
5750
.set_delay_threshold(config.batch_delay)
5851
.with_grpc_subchannel_count(config.common.grpc_channels)
5952
.build()
60-
.await
61-
.unwrap()
53+
.await?;
54+
55+
run_publisher(Arc::new(config.clone()), publisher).await?;
56+
57+
Ok(())
6258
}
6359

6460
/// Orchestrates the publishing loop and reporting logic.
6561
///
6662
/// This function spawns a background task to continuously publish messages
6763
/// and uses the main thread to sleep and report metrics at fixed intervals.
68-
async fn run_publisher(config: Arc<PublisherArgs>, topic_name: String) {
69-
let publisher = create_publisher(config.clone(), topic_name).await;
64+
async fn run_publisher(config: Arc<PublisherArgs>, publisher: Publisher) -> anyhow::Result<()> {
7065
let payload_size = config.payload_size;
7166
let data = bytes::Bytes::from(vec![0u8; payload_size as usize]);
7267
let semaphore = Arc::new(tokio::sync::Semaphore::new(
@@ -79,26 +74,32 @@ async fn run_publisher(config: Arc<PublisherArgs>, topic_name: String) {
7974
let publisher_handle = tokio::task::spawn(async move {
8075
loop {
8176
// Respect the max_outstanding_messages limit.
82-
let permit = semaphore.clone().acquire_owned().await.unwrap();
77+
let permit = match semaphore.clone().acquire_owned().await {
78+
Ok(p) => p,
79+
Err(e) => {
80+
eprintln!("Error acquiring permit: {}", e);
81+
break;
82+
}
83+
};
8384
let p = publisher.publish(Message::new().set_data(data.clone()));
8485
publisher_stats.send_count.fetch_add(1, Ordering::Relaxed);
8586
publisher_stats
8687
.send_bytes
8788
.fetch_add(payload_size, Ordering::Relaxed);
8889

89-
let ack_stats = publisher_stats.clone();
90+
let recv_stats = publisher_stats.clone();
9091
tokio::spawn(async move {
9192
let _permit = permit;
9293
match p.await {
9394
Ok(_) => {
94-
ack_stats.ack_count.fetch_add(1, Ordering::Relaxed);
95-
ack_stats
96-
.ack_bytes
95+
recv_stats.recv_count.fetch_add(1, Ordering::Relaxed);
96+
recv_stats
97+
.recv_bytes
9798
.fetch_add(payload_size, Ordering::Relaxed);
9899
}
99100
Err(e) => {
100101
eprintln!("Error: {}", e);
101-
ack_stats.error_count.fetch_add(1, Ordering::Relaxed);
102+
recv_stats.error_count.fetch_add(1, Ordering::Relaxed);
102103
}
103104
}
104105
});
@@ -113,26 +114,26 @@ async fn run_publisher(config: Arc<PublisherArgs>, topic_name: String) {
113114
let timer = Instant::now();
114115
let start_send_count = stats.send_count.load(Ordering::Relaxed);
115116
let start_send_bytes = stats.send_bytes.load(Ordering::Relaxed);
116-
let start_ack_count = stats.ack_count.load(Ordering::Relaxed);
117-
let start_ack_bytes = stats.ack_bytes.load(Ordering::Relaxed);
117+
let start_recv_count = stats.recv_count.load(Ordering::Relaxed);
118+
let start_recv_bytes = stats.recv_bytes.load(Ordering::Relaxed);
118119
let start_error_count = stats.error_count.load(Ordering::Relaxed);
119120

120121
tokio::time::sleep(config.common.report_interval).await;
121122

122123
// Calculate deltas since the last report interval.
123124
let send_count_last = stats.send_count.load(Ordering::Relaxed) - start_send_count;
124125
let send_bytes_last = stats.send_bytes.load(Ordering::Relaxed) - start_send_bytes;
125-
let ack_count_last = stats.ack_count.load(Ordering::Relaxed) - start_ack_count;
126-
let ack_bytes_last = stats.ack_bytes.load(Ordering::Relaxed) - start_ack_bytes;
126+
let recv_count_last = stats.recv_count.load(Ordering::Relaxed) - start_recv_count;
127+
let recv_bytes_last = stats.recv_bytes.load(Ordering::Relaxed) - start_recv_bytes;
127128
let error_count_last = stats.error_count.load(Ordering::Relaxed) - start_error_count;
128129
let usage = timer.elapsed();
129130

130131
print_result("Pub", i, send_count_last, send_bytes_last, 0, usage);
131132
print_result(
132-
"Ack",
133+
"Recv",
133134
i,
134-
ack_count_last,
135-
ack_bytes_last,
135+
recv_count_last,
136+
recv_bytes_last,
136137
error_count_last,
137138
usage,
138139
);
@@ -141,9 +142,11 @@ async fn run_publisher(config: Arc<PublisherArgs>, topic_name: String) {
141142
publisher_handle.abort();
142143

143144
println!(
144-
"# Publisher: error_count={}, ack_count={}, send_count={}",
145+
"# Publisher: error_count={}, received_count={}, send_count={}",
145146
stats.error_count.load(Ordering::Relaxed),
146-
stats.ack_count.load(Ordering::Relaxed),
147+
stats.recv_count.load(Ordering::Relaxed),
147148
stats.send_count.load(Ordering::Relaxed)
148149
);
150+
151+
Ok(())
149152
}

src/pubsub/benchmarks/throughput/src/subscriber.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ pub async fn run(config: SubscriberArgs) -> Result<(), anyhow::Error> {
4242
let subscriber = Subscriber::builder()
4343
.with_grpc_subchannel_count(config.common.grpc_channels)
4444
.build()
45-
.await
46-
.unwrap();
45+
.await?;
4746

4847
run_subscriber(Arc::new(config.clone()), subscriber, &subscription_name).await;
4948

@@ -91,15 +90,13 @@ async fn run_subscriber(
9190
let stats = Arc::new(Stats::default());
9291
let mut tasks = Vec::new();
9392
let max_outstanding_per_task = config.common.max_outstanding_messages / config.streams;
94-
if max_outstanding_per_task > 0 {
95-
for _ in 0..config.streams {
96-
tasks.push(tokio::spawn(subscriber_task(
97-
subscriber.clone(),
98-
subscription_name.to_string(),
99-
max_outstanding_per_task,
100-
stats.clone(),
101-
)));
102-
}
93+
for _ in 0..config.streams {
94+
tasks.push(tokio::spawn(subscriber_task(
95+
subscriber.clone(),
96+
subscription_name.to_string(),
97+
max_outstanding_per_task,
98+
stats.clone(),
99+
)));
103100
}
104101

105102
let start = Instant::now();

0 commit comments

Comments
 (0)