fix(messages): prevent infinite re-read loop when polling range is empty#1685
fix(messages): prevent infinite re-read loop when polling range is empty#1685Mohoki wants to merge 3 commits intokafbat:mainfrom
Conversation
When messagesPerPage (limit) was 0, msgsToPollPerPartition became 0, producing an empty range (from, from). nextPollingRange then returned the same range, so the consumer never advanced and re-read the same data until timeout, causing hundreds of millions of messages consumed and zero results (No messages found). Changes: - ForwardEmitter/BackwardEmitter: ensure msgsToPollPerPartition is at least 1 via Math.max(1, ...) so the range is never empty. - MessagesService: when loading by cursor, pass fixPageSize(cursor.limit()) so a cursor with limit 0 is normalized to defaultPageSize and never reaches the emitter. Tests: - RecordEmitterTest: add forwardAndBackwardEmitterWithZeroMessagesPerPageCompleteWithoutHanging - CursorTest: add forwardEmitterWithZeroLimitCompletesWithoutHanging and backwardEmitterWithZeroLimitCompletesWithoutHanging
There was a problem hiding this comment.
Hi Mohoki! 👋
Welcome, and thank you for opening your first PR in the repo!
Please wait for triaging by our maintainers.
Please take a look at our contributing guide.
please elaborate on how can this happen |
|
Good point.
That’s why we fixed both:
At the same time, from newer logs we see another issue: offsets were moving and polling finished, so that case was not the So this PR fixes a real edge case, but it’s not the full root-cause fix for the heavy/slow reads. Better not merge as “final fix” yet. |
|
can you provide steps to reproduce this issue? let's start with that. |
|
Hi @Haarolean, repro steps for the original issue: Create a topic with many partitions and a large backlog (e.g. ~30 partitions, ~200k+ records total) |
- Added maxMessagesToScanPerPoll property to ClustersProperties and PollingSettings. - Updated BackwardEmitter and ForwardEmitter to use nextChunkSizePerPartition for calculating messages to poll. - Enhanced ConsumerGroupService to create consumers with a configurable poll limit. - Updated MessagesService to pass the limit when creating consumers. - Added tests for PollingSettings to ensure correct behavior with maxMessagesToScanPerPoll. - Updated API documentation and contract specifications to reflect the new configuration. This change improves the flexibility of message polling by allowing configuration of the maximum number of messages to scan per poll.
- Added methods to AbstractEmitter, MessagesProcessing, and ConsumingStats to support sending consuming stats for in-range records. - Updated RangePollingEmitter to utilize new methods for reporting in-range-only stats during polling. - Introduced a test case in RecordEmitterTest to verify that consuming stats reflect only in-range records. This change improves the accuracy of consuming statistics reported by range emitters, ensuring they only account for records within the specified range.
What changes did you make? (Give an overview)
When messagesPerPage (limit) was 0, msgsToPollPerPartition became 0, producing an empty range (from, from). nextPollingRange then returned the same range, so the consumer never advanced and re-read the same data until timeout, causing hundreds of millions of messages consumed and zero results (No messages found).
Changes:
Tests:
Is there anything you'd like reviewers to focus on?
How Has This Been Tested? (put an "x" (case-sensitive!) next to an item)
Checklist (put an "x" (case-sensitive!) next to all the items, otherwise the build will fail)
Check out Contributing and Code of Conduct
A picture of a cute animal (not mandatory but encouraged)
Resolves: #1684