Conversation
00dc642 to
6765d64
Compare
b557015 to
7b6fd8a
Compare
There was a problem hiding this comment.
Pull request overview
Refactors the diskless log initialization flow by extracting the per-partition state machine into InitDisklessLogState and extracting batched request scheduling/sending/retry into InitDisklessLogBatchQueue, with corresponding updates to unit/integration-style tests.
Changes:
- Introduces a sealed state model (
WaitingForReplication,SendingToController,AwaitingMetadata) and a protocol for sendingSendingToControllerbatches to the controller. - Adds a generic retriable batch queue to coalesce work (linger) and retry failures with capped exponential delay.
- Updates tests to assert against state types rather than the removed
InitStateenum.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| core/src/main/scala/kafka/server/InitDisklessLogManager.scala | Switches manager to track InitDisklessLogState objects and delegates batching/retries to the new queue. |
| core/src/main/scala/kafka/server/InitDisklessLogState.scala | Adds the extracted state machine and the controller batch protocol for SendingToController. |
| core/src/main/scala/kafka/server/InitDisklessLogBatchQueue.scala | Adds a generic batch queue with linger + retry scheduling and per-partition result futures. |
| core/src/test/scala/unit/kafka/server/metadata/InitDisklessLogFlowTest.scala | Updates assertions to validate tracked state by class/type. |
| core/src/test/scala/unit/kafka/server/InitDisklessLogManagerTest.scala | Updates tests for new state model and adds additional retry/backoff-related test coverage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…omponents Extract logic related to the states and state machine into InitDisklessLogState, and the logic related to request batching and sending into InitDisklessLogBatchQueue.
7b6fd8a to
f68b6e9
Compare
jeqo
left a comment
There was a problem hiding this comment.
Most of the refactoring looks good to me. Though I think we can reconsider the generic batching framework and go further on the PartitionListener-related changes.
| def validate(): Unit = { | ||
| if (!partition.isSealed) { | ||
| error(s"Partition is not sealed, which should never happen. Skipping migration.") | ||
| throw new IllegalArgumentException() |
There was a problem hiding this comment.
Throwing an error on case class constructor is not a common pattern on the kafka codebase, maybe better to leave this validation on the manager?
There was a problem hiding this comment.
Moved the validation in the advance state logic, so it goes into the Failed state instead of being a separate check.
There was a problem hiding this comment.
Wonder if we should consider dropping this generic batch queue framework. It introduces some behavioral changes and doesn't have much reuse on the upcoming PR #563 as most features are by-pass (no batching, no parsing, sync calls, etc).
It also diverges from AlterPartitionManager patterns that already copes with similar requirements and it's already followed in the current implementation -- increasing cognitive load.
There was a problem hiding this comment.
I've refactored the batch queue code to be less complex now. It's just an abstract class that can be extended. I still think there's value in having an abstract class that implements most of the machinery (batching, retries, backoff), because it's really code that would be duplicated in the implementation of the InitDisklessLog call for the control plane. #563 was not updated yet to have batching but now it has. It also needs the same mechanism for retries and backoff calculation. Let me know what you think about this new abstraction.
There was a problem hiding this comment.
Let's go for this approach for now. We can reassess if needed after the next PR lands.
…omponents (#561) Extract logic related to the states and state machine into InitDisklessLogState, and the logic related to request batching and sending into InitDisklessLogBatchQueue.
…omponents (#561) Extract logic related to the states and state machine into InitDisklessLogState, and the logic related to request batching and sending into InitDisklessLogBatchQueue.
Extract logic related to the states and state machine into InitDisklessLogState, and the logic related to request batching and sending into InitDisklessLogBatchQueue.