fix: cancellation-safe message reading in stream_loop (#5)#6
Merged
Conversation
WorkerState::stream_loop ran read_backend_message_into inside a tokio::select! racing stop_rx.changed() and tokio::time::timeout. read_backend_message_into uses read_exact internally, which is not cancellation-safe — when the losing select arm dropped the read future mid-message, partially-read header/payload bytes were lost. The next iteration then mis-parsed the wire stream, typically producing a bogus payload_len that hung waiting for non-existent bytes (or surfaced as a Protocol error). Add MessageReader, which externalizes partial-read state on the caller and uses one-shot AsyncReadExt::read (cancellation-safe). stream_loop owns a single MessageReader reused across drain and wait phases. read_backend_message_into is retained for non-select! callers (startup, auth, replication-start) and documented as not cancellation-safe. Tests: - 2 framing-level regression tests drive MessageReader through tokio::time::timeout cancellation mid-header and mid-payload, asserting the resumed read returns the original message intact - 86 unit + 9 integration + 16 doctests all pass; clippy clean Benchmarks (in-memory Cursor, worst case — no I/O wait): 64 B: 2.41 -> 2.13 GiB/s (-11%) 256 B: 6.44 -> 5.82 GiB/s (-10%) 1024 B: 17.5 -> 18.3 GiB/s ( +5%) 4096 B: 27.9 -> 26.3 GiB/s ( -5%) On the real BufReader-backed socket path, each read() returns the full requested slice in one poll, so the gap collapses; WAL streams remain bounded by Postgres+TCP, not framing.
878a1d3 to
b4b898e
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
WorkerState::stream_loopran read_backend_message_into inside atokio::select!racingstop_rx.changed()and `tokio::time::timeout. read_backend_message_into uses read_exact internally, which is not cancellation-safe - when the losing select arm dropped the read future mid-message, partially-read header/payload bytes were lost. The next iteration then mis-parsed the wire stream, typically producing a bogus payload_len that hung waiting for non-existent bytes (or surfaced as a Protocol error).Add
MessageReader, which externalizes partial-read state on the caller and uses one-shotAsyncReadExt::read(cancellation-safe). stream_loop owns a single MessageReader reused across drain and wait phases.read_backend_message_into is retained for non-select! callers (startup, auth, replication-start) and documented as not cancellation-safe.
Tests:
On the real BufReader-backed socket path, each read() returns the full
requested slice in one poll, so the gap collapses; WAL streams remain
bounded by Postgres+TCP, not framing.