fix delivery tag collision in pubsub transport#2487
fix delivery tag collision in pubsub transport#2487jgogstad wants to merge 15 commits intocelery:mainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2487 +/- ##
==========================================
+ Coverage 82.44% 82.51% +0.06%
==========================================
Files 79 79
Lines 10142 10157 +15
Branches 1165 1166 +1
==========================================
+ Hits 8362 8381 +19
+ Misses 1579 1578 -1
+ Partials 201 198 -3 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Pull request overview
This PR addresses reliability issues in the GCP Pub/Sub transport by ensuring (1) ack-deadline extensions respect Pub/Sub request size constraints and (2) Pub/Sub redeliveries don’t collide in Kombu’s QoS bookkeeping by reassigning delivery_tag per delivery.
Changes:
- Assign a fresh, unique
delivery_tagon every pull (_get/_get_bulk) to avoid QoS_deliveredkey collisions on redelivery. - Batch
modify_ack_deadlinecalls when extending ack deadlines to avoid oversized requests. - Add unit tests covering unique delivery tags and ack-deadline extension batching behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
kombu/transport/gcpubsub.py |
Generates unique delivery tags per delivery and batches ack-deadline extension requests. |
t/unit/transport/test_gcpubsub.py |
Adds tests for delivery-tag uniqueness across pulls/redeliveries and batching of deadline extensions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR addresses reliability issues in the GCP Pub/Sub transport by preventing delivery tag collisions on redelivery and by batching ModifyAckDeadline calls to stay within Pub/Sub request size limits.
Changes:
- Assign a fresh, unique
delivery_tagper Pub/Sub delivery in_getand_get_bulkto avoid QoS_deliveredkey collisions across redeliveries. - Batch
modify_ack_deadlinerequests using a fixed batch size to avoid exceeding Pub/Sub’s 512 KiB request limit. - Add unit tests covering unique delivery tags across redeliveries/bulk pulls and verifying deadline-extension batching behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
kombu/transport/gcpubsub.py |
Regenerates delivery tags on pull and batches modify_ack_deadline calls. |
t/unit/transport/test_gcpubsub.py |
Adds tests for unique delivery tags and batched ack-deadline extensions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Fixes GCP Pub/Sub transport reliability issues caused by (1) Pub/Sub ModifyAckDeadline request size limits and (2) delivery tag collisions on redelivery that can corrupt Kombu virtual QoS tracking.
Changes:
- Overwrite
properties.delivery_tagon pull (_get/_get_bulk) to ensure a unique per-delivery tag and prevent QoS_deliveredcollisions on redelivery. - Batch
modify_ack_deadlinecalls when extending ack deadlines to reduce the chance of exceeding Pub/Sub’s 512 KiB request-size limit. - Add unit tests covering unique per-delivery tags, correct ack-id usage across redeliveries, and ack-deadline batching behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
kombu/transport/gcpubsub.py |
Implements unique per-delivery delivery_tag assignment and batches modify_ack_deadline requests. |
t/unit/transport/test_gcpubsub.py |
Adds tests for unique tags on pull/bulk pull, correct ack behavior on redelivery, and batching of ack-deadline extensions. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for i in range(0, len(ack_ids), _ACK_MODIFY_BATCH_SIZE): | ||
| batch = ack_ids[i:i + _ACK_MODIFY_BATCH_SIZE] | ||
| try: | ||
| self.subscriber.modify_ack_deadline( | ||
| request={ | ||
| "subscription": qdesc.subscription_path, | ||
| "ack_ids": batch, | ||
| "ack_deadline_seconds": self.ack_deadline_seconds, |
There was a problem hiding this comment.
The batching loop is count-based, but the Pub/Sub limit is on total request size (512 KiB). To prevent still hitting the limit in edge cases (very long ack_ids), it would be more robust to split batches by estimated/serialized byte size rather than a fixed _ACK_MODIFY_BATCH_SIZE count.
There was a problem hiding this comment.
please cross check this suggestion
There was a problem hiding this comment.
@auvipy
sorry, I somehow missed this.
This is what we're doing, estimating that is. An ack-id is around 200 characters, so 1000 ids is ~200KiB. If we wanted to be really thorough, we could count the bytes accumulated in memory and then dynamically partition them. This would only guard against pubsub changing the size of an ack-id though
A middle ground is just to make the limit configurable, I'll add a commit for that
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This PR fixes two bugs with the gcp pubsub transport:
the first one is straight forward, the
ModifyAckDeadlinerequest body length limit is 512 KiB, so apply batching when extending; https://docs.cloud.google.com/pubsub/quotas#resource_limitsfor the second one, when pubsub delivers the same message multiple times—via ack deadline expiry, explicit nack, or its at-least-once delivery guarantee under concurrent consumers. Each delivery has the same
delivery_tag, embedded in the payload at publish time (Channel._inplace_augment_message) , but a differentack_id(assigned by Pub/Sub per delivery). Kombu's virtual transport usesdelivery_tagas the key inQoS._delivered, so when the same message is consumed more than once, the new entry overwrites the previous one, losing itsack_id. This causesbasic_ackto either acknowledge the wrongack_id(which is fine, it's the same message) or raiseKeyError. The latter happens when_flushremoves the entry andbasic_acktries to read itI propose that the fix for the second one is to generate a unique
delivery_tagon pull, this aligns with how the RabbitMQ transport does it (unique delivery_tags for each delivery). Maintainers please verify and have an opinion though, I don't know this code base well.We hit the second issue, we nacked a couple of messages, and we had
autoretry_for = (Exception,). Whenbasic_ackthrew exceptions due to the internal state being corrupted, it triggered the retry path which published more messages, raised exceptions on ack, and so on (this is how we ended up with hitting theModfiyAckDeadlinelimit).I'm pretty sure there's a bug where a nack doesn't cause the ack-id to be removed from the list of ack ids being extended, it's not a problem per se, it'll just be ignored by pubsub. Not handled in this PR