impl(pubsub): subscriber shutdown#5251
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5251 +/- ##
========================================
Coverage 97.98% 97.98%
========================================
Files 215 215
Lines 44755 44904 +149
========================================
+ Hits 43852 43999 +147
- Misses 903 905 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| let shutdown_clone = shutdown.clone(); | ||
| let _shutdown_guard = shutdown.clone().drop_guard(); | ||
| tokio::spawn(async move { | ||
| // Hold the strong senders for the channels, dropping them when an |
There was a problem hiding this comment.
nit: can you make clear that this is the only place that is allowed to hold the strong version? That way we don't come back in later and try to do a refactor to pass the Strong Sender somewhere else, without realizing there are consequences.
There was a problem hiding this comment.
without realizing there are consequences.
The unit tests will definitely fail.
this is the only place that is allowed to hold the strong version?
Well, this is the only place in the MessageStream. The Handlers all get a strong ack_tx. And the lease loop might be holding a strong message_tx, depending on the configured shutdown behavior. 🫨
As I write this, I agree with your main point that this is not clear, unnecessarily complicated. So, #5255.
I am going to merge this PR first, and get the feature done, then spend an hour or two trying to build the background task into the LeaseLoop. And have the LeaseLoop outputs be weak senders + the cancellation token. Which seems like a clearer/cleaner approach.
| pub fn shutdown_token(&self) -> ShutdownToken { | ||
| ShutdownToken { | ||
| inner: self.shutdown.clone(), | ||
| fut: self.lease_loop.clone(), |
There was a problem hiding this comment.
nit: Returning self.lease_loop was unexpected when I read this. Maybe a comment to explain that "The lease_loop exits once shutdown is complete" or "shutdown is considered complete when the lease loop has ended".
There was a problem hiding this comment.
The member variable says exactly this:
google-cloud-rust/src/pubsub/src/subscriber/message_stream.rs
Lines 67 to 68 in 431dc2a
I guess I can duplicate the comment.
Aside: it was probably surprising because my names are bad. Maybe shutdown should be cancel and lease_loop should be shutdown. WDYT?
Part of the work for #5024
Support a shutdown, signaled by the application (via a
ShutdownToken, or bydropping the stream), and after a permanent error.None of this is public. The next PR will make it public and add the proper documentation for the
shutdown_token()accessor.I couldn't really break this down without regressing existing behavior. And in the middle of the release, that seemed like a bad idea.