Skip to content

Commit a6e0359

Browse files
authored
impl(pubsub): add shutdown token (#5125)
Part of the work for #5024 Add a type that can signal and await shutdown. I think applications will typically signal and await together, but we separate the functions to allow for graceful shutdown without signaling shutdown (e.g. from an external error): ```rs let shutdown_token = stream.shutdown_token(); while let Some((m, h)) = stream.next().await { ... } // There must have been an external error. Wait for shutdown before returning. shutdown_token.shutdown().await; ```
1 parent b8f8f74 commit a6e0359

2 files changed

Lines changed: 122 additions & 0 deletions

File tree

src/pubsub/src/subscriber.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ mod leaser;
2727
mod message_stream;
2828
mod retry_policy;
2929
mod shutdown_behavior;
30+
#[allow(dead_code)] // TODO(#5024) - implementation in progress...
31+
mod shutdown_token;
3032
mod stream;
3133
mod stub;
3234
mod transport;
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use futures::future::{BoxFuture, Shared};
16+
use tokio_util::sync::CancellationToken;
17+
18+
/// A token to signal and await shutdown of a stream.
19+
///
20+
/// # Example
21+
/// ```no_rust
22+
/// use google_cloud_pubsub::subscriber::MessageStream;
23+
/// async fn sample(stream: MessageStream) {
24+
/// // Get a shutdown token for the stream.
25+
/// let token = stream.shutdown_token();
26+
///
27+
/// // Signal a shutdown of the stream.
28+
/// token.cancel();
29+
///
30+
/// // Await a shutdown of the stream.
31+
/// token.shutdown().await;
32+
/// }
33+
/// ```
34+
#[derive(Clone, Debug)]
35+
pub struct ShutdownToken {
36+
pub(super) inner: CancellationToken,
37+
pub(super) fut: Shared<BoxFuture<'static, ()>>,
38+
}
39+
40+
impl ShutdownToken {
41+
/// Signal a stream shutdown.
42+
///
43+
/// The stream will stop yielding messages.
44+
pub fn cancel(&self) {
45+
self.inner.cancel();
46+
}
47+
48+
/// Await a stream shutdown.
49+
///
50+
/// Applications should call this to ensure all pending ack/nack RPCs have
51+
/// time to complete before a process exits.
52+
///
53+
/// See [`Subscribe::set_shutdown_behavior`][setter] to configure the exact
54+
/// behavior on shutdown.
55+
///
56+
/// [setter]: crate::builder::subscriber::Subscribe::set_shutdown_behavior
57+
pub async fn shutdown(&self) {
58+
self.fut.clone().await
59+
}
60+
}
61+
62+
#[cfg(test)]
63+
mod tests {
64+
use super::*;
65+
use futures::FutureExt;
66+
use tokio::sync::oneshot::channel;
67+
68+
#[tokio::test(start_paused = true)]
69+
async fn cancel() {
70+
let token = ShutdownToken {
71+
inner: CancellationToken::new(),
72+
fut: std::future::pending().boxed().shared(),
73+
};
74+
assert!(!token.inner.is_cancelled(), "{token:?}");
75+
76+
let token_clone = token.clone();
77+
assert!(!token_clone.inner.is_cancelled(), "{token_clone:?}");
78+
79+
token.cancel();
80+
assert!(token.inner.is_cancelled(), "{token:?}");
81+
assert!(token_clone.inner.is_cancelled(), "{token_clone:?}");
82+
83+
// A second cancel is a no-op.
84+
token.cancel();
85+
assert!(token.inner.is_cancelled(), "{token:?}");
86+
}
87+
88+
#[tokio::test(start_paused = true)]
89+
async fn shutdown() -> anyhow::Result<()> {
90+
let (tx, rx) = channel();
91+
let fut = rx.map(|_| ()).boxed().shared();
92+
93+
let token = ShutdownToken {
94+
inner: CancellationToken::new(),
95+
fut,
96+
};
97+
assert!(token.fut.peek().is_none(), "future should be pending");
98+
99+
let token_clone = token.clone();
100+
assert!(token_clone.fut.peek().is_none(), "future should be pending");
101+
102+
let handle = tokio::spawn(async move {
103+
token_clone.shutdown().await;
104+
});
105+
tokio::task::yield_now().await;
106+
107+
assert!(token.fut.peek().is_none(), "future should be pending");
108+
109+
// Satisfy the future
110+
let _ = tx.send(());
111+
handle.await?;
112+
assert!(token.fut.peek().is_some(), "future should be satisfied");
113+
114+
// A second shutdown is a no-op.
115+
token.shutdown().await;
116+
assert!(token.fut.peek().is_some(), "future should be satisfied");
117+
118+
Ok(())
119+
}
120+
}

0 commit comments

Comments
 (0)