fix(python): consumer loop drain, error survival, dead Event.extra#291
Draft
NikolayS wants to merge 3 commits into
Draft
fix(python): consumer loop drain, error survival, dead Event.extra#291NikolayS wants to merge 3 commits into
NikolayS wants to merge 3 commits into
Conversation
After a non-empty batch, the consumer waited up to poll_interval (default 30s) for a NOTIFY before polling again. Notifies fire only on new ticks, so a consumer that was down with N batches accumulated drained them at one batch per poll_interval and could never catch up. _poll_once now reports whether it processed a batch; the loop re-polls immediately after a non-empty batch and only waits for NOTIFY/timeout when the queue came back empty. Matches the Go consumer's behavior. Nack-failure batches still wait before redelivery to avoid a hot loop. Addresses finding B2 of #283. https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv
A transient SQL error from receive or ack (failover, restart, network blip) propagated out of start() and killed the consumer, contradicting the documented 'blocks until SIGTERM/SIGINT' contract. Nack failures were already handled, making this an oversight. start() now wraps each connection session: on psycopg.Error or PgqueError it logs, waits poll_interval (in short slices so stop() stays prompt), reconnects, re-LISTENs, and resumes -- matching the Go consumer's log-and-retry behavior. KeyboardInterrupt and stop() handling are unchanged (BaseException is not caught). Tested with a flaky receive, a pg_terminate_backend'ed session, and a persistent failure during which stop() must return promptly. Addresses finding B3 of #283. https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv
Event.extra was declared but never transmitted: send() unpacks only type and payload, and the SQL pgque.send overloads carry no ev_extra1..4 parameters. A dict field also has no natural mapping to the four positional extra columns. Data placed there was silently lost, which is worse than not offering the field -- so remove it and fail loudly (TypeError) on construction. Addresses finding B5 of #283. https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Bugs
B2 (medium) — backlog drained at one batch per
poll_interval.Consumer.start()unconditionally ran_poll_once(conn)then_wait_for_notify_or_stop(conn). After a non-empty batch the consumer still waited up topoll_interval(default 30s) for a NOTIFY — but notifies fire only on new ticks, and the notifies for accumulated batches were emitted while the consumer was not listening. A consumer that was down with N batches accumulated drained them at ~1 batch perpoll_intervaland could never catch up.B3 (medium) — a transient SQL error killed the consumer.
There was no try/except around the poll iteration, so a transient error from receive or ack (failover, restart, network blip) propagated out of
start()and the consumer died, contradicting the documented "blocks until SIGTERM/SIGINT" contract. Nack failures were already handled, making this an oversight.B5 (low) —
Event.extrawas silently dropped.pgque/types.pydeclaredEvent.extra: dict[str, str], butclient.send()unpacks onlytypeandpayload;extrawas referenced nowhere. Data placed there was silently lost.Fixes
_poll_oncenow returns whether it processed a batch; the loop re-polls immediately after a non-empty batch and only waits for NOTIFY/timeout when the queue came back empty (matches the Go consumer). A batch left unacked due to a nack failure reportsFalse, so redelivery waits a poll cycle instead of hot-looping.start()now wraps each connection session: onpsycopg.Error/PgqueErrorit logs, sleepspoll_intervalin short slices (sostop()stays prompt), reconnects, re-LISTENs, and resumes — paralleling the Go consumer's log-and-retry on receive errors.KeyboardInterrupt(BaseException) andstop()semantics are unchanged; handler exceptions and nack failures remain contained inside_poll_once.pgque.sendoverloads are(queue, payload)and(queue, type, payload)only — noev_extra1..4parameters (those exist only on the PgQ primitivepgque.insert_event(7)), and adict[str, str]has no natural mapping to four positional extra columns anyway. So there is no natural wiring throughpgque.send; the dead field is removed. A never-working field that silently loses data is worse than a removed field — construction withextra=now fails loudly withTypeError.All three fixes were done red/green: each new test was verified failing against the unfixed code, then turned green by the fix (separate commits per finding).
New tests (
clients/python/tests/)test_consumer_drains_backlog_within_one_poll_interval— 3 pre-accumulated batches,poll_interval=30; all must be consumed within 10s. Red on old code:1/3 messages in 10.0s.test_consumer_survives_transient_receive_error— first receive raises, consumer must recover and process the message.test_consumer_survives_killed_backend—pg_terminate_backendon the consumer's session (what a DB restart/failover looks like to the client); consumer must reconnect, re-LISTEN, and consume an event produced after the kill.test_consumer_stop_is_prompt_during_error_retry_wait— persistent receive failure withpoll_interval=30; thread must stay alive andstop()must return in <2s.test_event_rejects_extra_kwarg—Event(..., extra={...})raisesTypeError.Verification
Against a fresh scratch database (PostgreSQL 16,
sql/pgque.sqlinstalled viapsql -f):Red-state evidence before each fix:
FAILED tests/test_consumer_resilience.py::test_consumer_drains_backlog_within_one_poll_interval — AssertionError: backlog not drained: 1/3 messages in 10.0s3 failed, 1 passed(test_consumer_survives_transient_receive_error,test_consumer_survives_killed_backend,test_consumer_stop_is_prompt_during_error_retry_wait)FAILED tests/test_send.py::test_event_rejects_extra_kwarg - Failed: DID NOT RAISEAddresses findings B2, B3, B5 of #283
https://claude.ai/code/session_01KAaEGkQZmey1D1xCsVGmqv
Generated by Claude Code