Skip to content

Commit 6fdbeb7

Browse files
committed
fix(server): advance partition offset on empty dedup batch and fix journal offset tracking
- When all messages in a batch are duplicates, advance partition offset past the assigned (but removed) offset range to prevent offset reuse in subsequent batches - Fix journal current_offset to use actual last offset from batch instead of arithmetic that ignores gaps created by dedup removal Signed-off-by: shin <sars21@hanmail.net>
1 parent 0a4a0dc commit 6fdbeb7

1 file changed

Lines changed: 7 additions & 4 deletions

File tree

core/server/src/streaming/partitions/journal.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,14 @@ impl Journal for MemoryMessageJournal {
8484
&& let Some(first_offset) = entry.first_offset()
8585
{
8686
// Allow disagreement when either side is 0 (fresh partition or
87-
// reset after purge). Only flag when both are non-zero and differ.
87+
// reset after purge), or when first_offset > base_offset (offset
88+
// gap from deduplicated batches that skipped journal append).
89+
// Only flag when first_offset < base_offset (offset regression).
8890
debug_assert!(
8991
self.inner.base_offset == 0
9092
|| first_offset == 0
91-
|| self.inner.base_offset == first_offset,
92-
"journal base_offset ({}) disagrees with batch first_offset ({})",
93+
|| self.inner.base_offset <= first_offset,
94+
"journal base_offset ({}) is ahead of batch first_offset ({})",
9395
self.inner.base_offset,
9496
first_offset
9597
);
@@ -99,14 +101,15 @@ impl Journal for MemoryMessageJournal {
99101
let batch_size = entry.size();
100102
let first_timestamp = entry.first_timestamp().unwrap();
101103
let last_timestamp = entry.last_timestamp().unwrap();
104+
let last_offset = entry.last_offset().unwrap();
102105
self.batches.add_batch(entry);
103106

104107
if self.inner.first_timestamp == 0 {
105108
self.inner.first_timestamp = first_timestamp;
106109
}
107110
self.inner.end_timestamp = last_timestamp;
108111
self.inner.messages_count += batch_messages_count;
109-
self.inner.current_offset = self.inner.base_offset + self.inner.messages_count as u64 - 1;
112+
self.inner.current_offset = last_offset;
110113
self.inner.size = IggyByteSize::from(self.inner.size.as_bytes_u64() + batch_size as u64);
111114

112115
Ok((self.inner.messages_count, self.inner.size.as_bytes_u32()))

0 commit comments

Comments
 (0)