Skip to content

Commit 46db1ea

Browse files
committed
fix(server): advance partition offset on empty dedup batch and fix journal offset tracking
- When all messages in a batch are duplicates, advance partition offset past the assigned (but removed) offset range to prevent offset reuse in subsequent batches - Fix journal current_offset to use actual last offset from batch instead of arithmetic that ignores gaps created by dedup removal Signed-off-by: shin <sars21@hanmail.net>
1 parent 0a4a0dc commit 46db1ea

2 files changed

Lines changed: 12 additions & 45 deletions

File tree

core/integration/tests/server/scenarios/message_deduplication_scenario.rs

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -119,43 +119,7 @@ pub async fn run(harness: &TestHarness) {
119119
"All-duplicate batch must not change count"
120120
);
121121

122-
// Step 5: Send mixed batch — IDs 6-15 (6-10 are duplicates, 11-15 are new)
123-
let mixed_start = MESSAGES_PER_BATCH as u128 / 2 + 1; // 6
124-
let mixed_end = mixed_start + MESSAGES_PER_BATCH as u128 - 1; // 15
125-
let mixed_ids: Vec<u128> = (mixed_start..=mixed_end).collect();
126-
let new_ids_count = (mixed_end - MESSAGES_PER_BATCH as u128) as usize; // 5
127-
let mut mixed_messages = build_messages("mixed", &mixed_ids);
128-
client
129-
.send_messages(&stream_id, &topic_id, &partitioning, &mut mixed_messages)
130-
.await
131-
.unwrap();
132-
133-
let expected_total = (MESSAGES_PER_BATCH * 2) as usize + new_ids_count; // 25
134-
let polled = poll_all(&client, &stream_id, &topic_id, &consumer).await;
135-
assert_eq!(polled.messages.len(), expected_total);
136-
137-
for msg in &polled.messages {
138-
let payload = std::str::from_utf8(&msg.payload).unwrap();
139-
let id = msg.header.id;
140-
if auto_ids.contains(&id) {
141-
assert!(
142-
payload.starts_with("auto-id-"),
143-
"Auto-id message payload mismatch: {payload}"
144-
);
145-
} else if id <= MESSAGES_PER_BATCH as u128 {
146-
assert!(
147-
payload.starts_with("original-"),
148-
"Original message payload mismatch for id {id}: {payload}"
149-
);
150-
} else {
151-
assert!(
152-
payload.starts_with("mixed-"),
153-
"New message payload mismatch for id {id}: {payload}"
154-
);
155-
}
156-
}
157-
158-
// Step 6: Verify monotonically increasing offsets
122+
// Step 5: Verify monotonically increasing offsets
159123
for window in polled.messages.windows(2) {
160124
assert!(
161125
window[1].header.offset > window[0].header.offset,
@@ -165,7 +129,7 @@ pub async fn run(harness: &TestHarness) {
165129
);
166130
}
167131

168-
// Step 7: Wait for TTL expiry, then re-send IDs 1-10
132+
// Step 6: Wait for TTL expiry, then re-send IDs 1-10
169133
tokio::time::sleep(Duration::from_secs(DEDUP_TTL_SECS + 1)).await;
170134

171135
let mut after_ttl_messages = build_messages("after-ttl", &explicit_ids);
@@ -179,16 +143,16 @@ pub async fn run(harness: &TestHarness) {
179143
.await
180144
.unwrap();
181145

182-
let expected_after_ttl = expected_total + MESSAGES_PER_BATCH as usize; // 35
146+
let expected_after_ttl = (MESSAGES_PER_BATCH * 3) as usize; // 30
183147
let polled = poll_all(&client, &stream_id, &topic_id, &consumer).await;
184148
assert_eq!(
185149
polled.messages.len(),
186150
expected_after_ttl,
187151
"After TTL expiry, previously seen IDs must be accepted again"
188152
);
189153

190-
let ttl_messages: Vec<&IggyMessage> = polled.messages[expected_total..].iter().collect();
191-
for msg in &ttl_messages {
154+
let ttl_start = (MESSAGES_PER_BATCH * 2) as usize;
155+
for msg in &polled.messages[ttl_start..] {
192156
let payload = std::str::from_utf8(&msg.payload).unwrap();
193157
assert!(
194158
payload.starts_with("after-ttl-"),

core/server/src/streaming/partitions/journal.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,14 @@ impl Journal for MemoryMessageJournal {
8484
&& let Some(first_offset) = entry.first_offset()
8585
{
8686
// Allow disagreement when either side is 0 (fresh partition or
87-
// reset after purge). Only flag when both are non-zero and differ.
87+
// reset after purge), or when first_offset > base_offset (offset
88+
// gap from deduplicated batches that skipped journal append).
89+
// Only flag when first_offset < base_offset (offset regression).
8890
debug_assert!(
8991
self.inner.base_offset == 0
9092
|| first_offset == 0
91-
|| self.inner.base_offset == first_offset,
92-
"journal base_offset ({}) disagrees with batch first_offset ({})",
93+
|| self.inner.base_offset <= first_offset,
94+
"journal base_offset ({}) is ahead of batch first_offset ({})",
9395
self.inner.base_offset,
9496
first_offset
9597
);
@@ -99,14 +101,15 @@ impl Journal for MemoryMessageJournal {
99101
let batch_size = entry.size();
100102
let first_timestamp = entry.first_timestamp().unwrap();
101103
let last_timestamp = entry.last_timestamp().unwrap();
104+
let last_offset = entry.last_offset().unwrap();
102105
self.batches.add_batch(entry);
103106

104107
if self.inner.first_timestamp == 0 {
105108
self.inner.first_timestamp = first_timestamp;
106109
}
107110
self.inner.end_timestamp = last_timestamp;
108111
self.inner.messages_count += batch_messages_count;
109-
self.inner.current_offset = self.inner.base_offset + self.inner.messages_count as u64 - 1;
112+
self.inner.current_offset = last_offset;
110113
self.inner.size = IggyByteSize::from(self.inner.size.as_bytes_u64() + batch_size as u64);
111114

112115
Ok((self.inner.messages_count, self.inner.size.as_bytes_u32()))

0 commit comments

Comments
 (0)