From 77daa36b49de5030572507ae8ac4eb78363dfac7 Mon Sep 17 00:00:00 2001 From: Krishna Vishal Date: Wed, 22 Apr 2026 04:45:22 +0530 Subject: [PATCH 1/4] feat(shard): add v2 consumer-offset commands with AckLevel --- core/binary_protocol/src/codes.rs | 4 + .../src/consensus/operation.rs | 11 +- core/binary_protocol/src/dispatch.rs | 88 +++++--- core/binary_protocol/src/lib.rs | 1 + .../src/primitives/ack_level.rs | 108 +++++++++ core/binary_protocol/src/primitives/mod.rs | 1 + .../delete_consumer_offset_2.rs | 200 +++++++++++++++++ .../src/requests/consumer_offsets/mod.rs | 4 + .../store_consumer_offset_2.rs | 212 ++++++++++++++++++ .../delete_consumer_offset_2.rs | 19 ++ .../src/responses/consumer_offsets/mod.rs | 4 + .../store_consumer_offset_2.rs | 19 ++ core/common/src/lib.rs | 1 + .../traits/binary_impls/consumer_offsets.rs | 63 +++++- .../src/traits/consumer_offset_client.rs | 31 +++ core/consensus/src/observability.rs | 2 + .../binary_consumer_offset_client.rs | 112 ++++++++- .../sdk/src/clients/binary_consumer_offset.rs | 33 ++- core/sdk/src/http/consumer_offsets.rs | 30 ++- core/server/src/binary/dispatch.rs | 14 ++ .../delete_consumer_offset_2_handler.rs | 62 +++++ .../delete_consumer_offset_handler.rs | 9 +- .../binary/handlers/consumer_offsets/mod.rs | 2 + .../store_consumer_offset_2_handler.rs | 65 ++++++ .../store_consumer_offset_handler.rs | 2 + core/server/src/http/http_shard_wrapper.rs | 3 + core/server/src/shard/handlers.rs | 10 +- .../src/shard/system/consumer_offsets.rs | 58 +++-- core/server/src/shard/system/messages.rs | 81 +------ 29 files changed, 1109 insertions(+), 140 deletions(-) create mode 100644 core/binary_protocol/src/primitives/ack_level.rs create mode 100644 core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs create mode 100644 core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs create mode 100644 core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs create mode 100644 core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs create mode 100644 core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs create mode 100644 core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs diff --git a/core/binary_protocol/src/codes.rs b/core/binary_protocol/src/codes.rs index f663c85d0d..ae33a98142 100644 --- a/core/binary_protocol/src/codes.rs +++ b/core/binary_protocol/src/codes.rs @@ -57,6 +57,8 @@ pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102; pub const GET_CONSUMER_OFFSET_CODE: u32 = 120; pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121; pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122; +pub const STORE_CONSUMER_OFFSET_2_CODE: u32 = 123; +pub const DELETE_CONSUMER_OFFSET_2_CODE: u32 = 124; // -- Streams -- pub const GET_STREAM_CODE: u32 = 200; @@ -133,6 +135,8 @@ mod tests { GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE, DELETE_CONSUMER_OFFSET_CODE, + STORE_CONSUMER_OFFSET_2_CODE, + DELETE_CONSUMER_OFFSET_2_CODE, GET_STREAM_CODE, GET_STREAMS_CODE, CREATE_STREAM_CODE, diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs index f0a08a29d5..64a43a3cad 100644 --- a/core/binary_protocol/src/consensus/operation.rs +++ b/core/binary_protocol/src/consensus/operation.rs @@ -65,6 +65,9 @@ pub enum Operation { SendMessages = 160, StoreConsumerOffset = 161, DeleteConsumerOffset = 162, + // 163 is reserved for the planned DeleteSegments move (see TODO above). + StoreConsumerOffset2 = 164, + DeleteConsumerOffset2 = 165, } impl Operation { @@ -165,7 +168,9 @@ impl Operation { | Self::DeletePersonalAccessToken | Self::SendMessages | Self::StoreConsumerOffset - | Self::DeleteConsumerOffset => match crate::dispatch::lookup_by_operation(*self) { + | Self::DeleteConsumerOffset + | Self::StoreConsumerOffset2 + | Self::DeleteConsumerOffset2 => match crate::dispatch::lookup_by_operation(*self) { Some(meta) => Some(meta.code), None => None, }, @@ -214,6 +219,8 @@ mod tests { Operation::SendMessages, Operation::StoreConsumerOffset, Operation::DeleteConsumerOffset, + Operation::StoreConsumerOffset2, + Operation::DeleteConsumerOffset2, ]; for op in ops { let code = op @@ -270,5 +277,7 @@ mod tests { assert!(!Operation::SendMessages.is_metadata()); assert!(Operation::DeleteSegments.is_partition()); assert!(Operation::DeleteConsumerOffset.is_partition()); + assert!(Operation::StoreConsumerOffset2.is_partition()); + assert!(Operation::DeleteConsumerOffset2.is_partition()); } } diff --git a/core/binary_protocol/src/dispatch.rs b/core/binary_protocol/src/dispatch.rs index fb5e33683b..bb83280bed 100644 --- a/core/binary_protocol/src/dispatch.rs +++ b/core/binary_protocol/src/dispatch.rs @@ -126,6 +126,16 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[ "consumer_offset.delete", Operation::DeleteConsumerOffset, ), + CommandMeta::replicated( + STORE_CONSUMER_OFFSET_2_CODE, + "consumer_offset.store.v2", + Operation::StoreConsumerOffset2, + ), + CommandMeta::replicated( + DELETE_CONSUMER_OFFSET_2_CODE, + "consumer_offset.delete.v2", + Operation::DeleteConsumerOffset2, + ), // Streams CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"), CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"), @@ -212,28 +222,30 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> { GET_CONSUMER_OFFSET_CODE => 24, STORE_CONSUMER_OFFSET_CODE => 25, DELETE_CONSUMER_OFFSET_CODE => 26, - GET_STREAM_CODE => 27, - GET_STREAMS_CODE => 28, - CREATE_STREAM_CODE => 29, - DELETE_STREAM_CODE => 30, - UPDATE_STREAM_CODE => 31, - PURGE_STREAM_CODE => 32, - GET_TOPIC_CODE => 33, - GET_TOPICS_CODE => 34, - CREATE_TOPIC_CODE => 35, - DELETE_TOPIC_CODE => 36, - UPDATE_TOPIC_CODE => 37, - PURGE_TOPIC_CODE => 38, - CREATE_PARTITIONS_CODE => 39, - DELETE_PARTITIONS_CODE => 40, - DELETE_SEGMENTS_CODE => 41, - GET_CONSUMER_GROUP_CODE => 42, - GET_CONSUMER_GROUPS_CODE => 43, - CREATE_CONSUMER_GROUP_CODE => 44, - DELETE_CONSUMER_GROUP_CODE => 45, - JOIN_CONSUMER_GROUP_CODE => 46, - LEAVE_CONSUMER_GROUP_CODE => 47, - LOGIN_REGISTER_WITH_PAT_CODE => 48, + STORE_CONSUMER_OFFSET_2_CODE => 27, + DELETE_CONSUMER_OFFSET_2_CODE => 28, + GET_STREAM_CODE => 29, + GET_STREAMS_CODE => 30, + CREATE_STREAM_CODE => 31, + DELETE_STREAM_CODE => 32, + UPDATE_STREAM_CODE => 33, + PURGE_STREAM_CODE => 34, + GET_TOPIC_CODE => 35, + GET_TOPICS_CODE => 36, + CREATE_TOPIC_CODE => 37, + DELETE_TOPIC_CODE => 38, + UPDATE_TOPIC_CODE => 39, + PURGE_TOPIC_CODE => 40, + CREATE_PARTITIONS_CODE => 41, + DELETE_PARTITIONS_CODE => 42, + DELETE_SEGMENTS_CODE => 43, + GET_CONSUMER_GROUP_CODE => 44, + GET_CONSUMER_GROUPS_CODE => 45, + CREATE_CONSUMER_GROUP_CODE => 46, + DELETE_CONSUMER_GROUP_CODE => 47, + JOIN_CONSUMER_GROUP_CODE => 48, + LEAVE_CONSUMER_GROUP_CODE => 49, + LOGIN_REGISTER_WITH_PAT_CODE => 50, _ => return None, }; Some(&COMMAND_TABLE[idx]) @@ -247,19 +259,19 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> { pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> { // Indices must match the order of entries in COMMAND_TABLE above. let idx = match op { - Operation::CreateStream => 29, - Operation::UpdateStream => 31, - Operation::DeleteStream => 30, - Operation::PurgeStream => 32, - Operation::CreateTopic => 35, - Operation::UpdateTopic => 37, - Operation::DeleteTopic => 36, - Operation::PurgeTopic => 38, - Operation::CreatePartitions => 39, - Operation::DeletePartitions => 40, - Operation::DeleteSegments => 41, - Operation::CreateConsumerGroup => 44, - Operation::DeleteConsumerGroup => 45, + Operation::CreateStream => 31, + Operation::UpdateStream => 33, + Operation::DeleteStream => 32, + Operation::PurgeStream => 34, + Operation::CreateTopic => 37, + Operation::UpdateTopic => 39, + Operation::DeleteTopic => 38, + Operation::PurgeTopic => 40, + Operation::CreatePartitions => 41, + Operation::DeletePartitions => 42, + Operation::DeleteSegments => 43, + Operation::CreateConsumerGroup => 46, + Operation::DeleteConsumerGroup => 47, Operation::CreateUser => 9, Operation::UpdateUser => 11, Operation::DeleteUser => 10, @@ -270,6 +282,8 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> Operation::SendMessages => 22, Operation::StoreConsumerOffset => 25, Operation::DeleteConsumerOffset => 26, + Operation::StoreConsumerOffset2 => 27, + Operation::DeleteConsumerOffset2 => 28, Operation::CreateTopicWithAssignments | Operation::CreatePartitionsWithAssignments | Operation::Reserved @@ -313,6 +327,8 @@ mod tests { GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE, DELETE_CONSUMER_OFFSET_CODE, + STORE_CONSUMER_OFFSET_2_CODE, + DELETE_CONSUMER_OFFSET_2_CODE, GET_STREAM_CODE, GET_STREAMS_CODE, CREATE_STREAM_CODE, @@ -394,6 +410,8 @@ mod tests { Operation::SendMessages, Operation::StoreConsumerOffset, Operation::DeleteConsumerOffset, + Operation::StoreConsumerOffset2, + Operation::DeleteConsumerOffset2, ]; for op in replicated_ops { let meta = lookup_by_operation(op) diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs index 8d4544c2bb..3ef0a85130 100644 --- a/core/binary_protocol/src/lib.rs +++ b/core/binary_protocol/src/lib.rs @@ -80,6 +80,7 @@ pub use framing::{RequestFrame, RequestFrame2, ResponseFrame, ResponseFrame2, ST pub use message_view::{ WireMessageIterator, WireMessageIteratorMut, WireMessageView, WireMessageViewMut, }; +pub use primitives::ack_level::AckLevel; pub use primitives::consumer::WireConsumer; pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName}; pub use primitives::partition_assignment::CreatedPartitionAssignment; diff --git a/core/binary_protocol/src/primitives/ack_level.rs b/core/binary_protocol/src/primitives/ack_level.rs new file mode 100644 index 0000000000..a2ddb2389c --- /dev/null +++ b/core/binary_protocol/src/primitives/ack_level.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; + +/// Acknowledgement policy for consumer-offset write commands. +/// +/// Wire format: single `u8` discriminant. +/// - `NoAck(0)`: leader-local write only; respond as soon as the in-memory +/// and on-disk state have been updated. Matches the fast path used by +/// `PollMessages` auto-commit. +/// - `Quorum(1)`: submit through the partition VSR consensus pipeline and +/// respond only after the write has been committed by a quorum of replicas. +/// This is the default for explicit client writes. +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum AckLevel { + NoAck = 0, + #[default] + Quorum = 1, +} + +impl AckLevel { + /// Decode an `AckLevel` from its wire discriminant. + /// + /// # Errors + /// Returns `WireError::UnknownDiscriminant` for unrecognised values. + pub const fn from_code(code: u8) -> Result { + match code { + 0 => Ok(Self::NoAck), + 1 => Ok(Self::Quorum), + other => Err(WireError::UnknownDiscriminant { + type_name: "AckLevel", + value: other, + offset: 0, + }), + } + } + + /// Encode this `AckLevel` as its wire discriminant. + #[must_use] + pub const fn as_u8(self) -> u8 { + self as u8 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_no_ack() { + let level = AckLevel::NoAck; + let decoded = AckLevel::from_code(level.as_u8()).unwrap(); + assert_eq!(decoded, level); + } + + #[test] + fn roundtrip_quorum() { + let level = AckLevel::Quorum; + let decoded = AckLevel::from_code(level.as_u8()).unwrap(); + assert_eq!(decoded, level); + } + + #[test] + fn default_is_quorum() { + assert_eq!(AckLevel::default(), AckLevel::Quorum); + } + + #[test] + fn discriminant_values() { + assert_eq!(AckLevel::NoAck.as_u8(), 0); + assert_eq!(AckLevel::Quorum.as_u8(), 1); + } + + #[test] + fn unknown_discriminant_rejected() { + for code in 2u8..=u8::MAX { + let err = AckLevel::from_code(code).unwrap_err(); + match err { + WireError::UnknownDiscriminant { + type_name, + value, + offset, + } => { + assert_eq!(type_name, "AckLevel"); + assert_eq!(value, code); + assert_eq!(offset, 0); + } + other => panic!("unexpected error variant: {other:?}"), + } + } + } +} diff --git a/core/binary_protocol/src/primitives/mod.rs b/core/binary_protocol/src/primitives/mod.rs index 064f0fcfbb..f3c7ee4e32 100644 --- a/core/binary_protocol/src/primitives/mod.rs +++ b/core/binary_protocol/src/primitives/mod.rs @@ -17,6 +17,7 @@ //! Shared wire primitives reused across request and response types. +pub mod ack_level; pub mod consumer; pub mod identifier; pub mod partition_assignment; diff --git a/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs new file mode 100644 index 0000000000..315da10533 --- /dev/null +++ b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le}; +use crate::primitives::ack_level::AckLevel; +use crate::primitives::consumer::WireConsumer; +use bytes::{BufMut, BytesMut}; + +/// `DeleteConsumerOffset` v2 request. +/// +/// Extends v1 with an `ack` field that lets the client pick the commit +/// guarantee: `NoAck` for leader-local (fast) writes, `Quorum` for +/// consensus-committed writes. +/// +/// Wire format: +/// ```text +/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4 LE][ack:1] +/// ``` +/// +/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes +/// for the u32 value (0 when None). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DeleteConsumerOffset2Request { + pub consumer: WireConsumer, + pub stream_id: WireIdentifier, + pub topic_id: WireIdentifier, + pub partition_id: Option, + pub ack: AckLevel, +} + +impl WireEncode for DeleteConsumerOffset2Request { + fn encoded_size(&self) -> usize { + self.consumer.encoded_size() + + self.stream_id.encoded_size() + + self.topic_id.encoded_size() + + 1 + + 4 + + 1 + } + + fn encode(&self, buf: &mut BytesMut) { + self.consumer.encode(buf); + self.stream_id.encode(buf); + self.topic_id.encode(buf); + if let Some(pid) = self.partition_id { + buf.put_u8(1); + buf.put_u32_le(pid); + } else { + buf.put_u8(0); + buf.put_u32_le(0); + } + buf.put_u8(self.ack.as_u8()); + } +} + +impl WireDecode for DeleteConsumerOffset2Request { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let mut pos = 0; + let (consumer, n) = WireConsumer::decode(&buf[pos..])?; + pos += n; + let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let partition_flag = read_u8(buf, pos)?; + pos += 1; + let partition_raw = read_u32_le(buf, pos)?; + pos += 4; + let partition_id = if partition_flag == 1 { + Some(partition_raw) + } else { + None + }; + let ack_code = read_u8(buf, pos)?; + pos += 1; + let ack = AckLevel::from_code(ack_code)?; + Ok(( + Self { + consumer, + stream_id, + topic_id, + partition_id, + ack, + }, + pos, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_with_partition_quorum() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(10), + topic_id: WireIdentifier::numeric(20), + partition_id: Some(5), + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = DeleteConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_without_partition_no_ack() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: None, + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = DeleteConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_named_identifiers() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()), + stream_id: WireIdentifier::named("stream-1").unwrap(), + topic_id: WireIdentifier::named("topic-1").unwrap(), + partition_id: Some(0), + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = DeleteConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn ack_byte_is_last() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8()); + } + + #[test] + fn unknown_ack_rejected() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + ack: AckLevel::Quorum, + }; + let mut bytes = req.to_bytes().to_vec(); + let last = bytes.len() - 1; + bytes[last] = 0xFF; + assert!(DeleteConsumerOffset2Request::decode(&bytes).is_err()); + } + + #[test] + fn truncated_returns_error() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(1), + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + for i in 0..bytes.len() { + assert!( + DeleteConsumerOffset2Request::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } +} diff --git a/core/binary_protocol/src/requests/consumer_offsets/mod.rs b/core/binary_protocol/src/requests/consumer_offsets/mod.rs index bde4d02b58..f8d3cee598 100644 --- a/core/binary_protocol/src/requests/consumer_offsets/mod.rs +++ b/core/binary_protocol/src/requests/consumer_offsets/mod.rs @@ -16,9 +16,13 @@ // under the License. pub mod delete_consumer_offset; +pub mod delete_consumer_offset_2; pub mod get_consumer_offset; pub mod store_consumer_offset; +pub mod store_consumer_offset_2; pub use delete_consumer_offset::DeleteConsumerOffsetRequest; +pub use delete_consumer_offset_2::DeleteConsumerOffset2Request; pub use get_consumer_offset::GetConsumerOffsetRequest; pub use store_consumer_offset::StoreConsumerOffsetRequest; +pub use store_consumer_offset_2::StoreConsumerOffset2Request; diff --git a/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs new file mode 100644 index 0000000000..39c60ec52b --- /dev/null +++ b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le}; +use crate::primitives::ack_level::AckLevel; +use crate::primitives::consumer::WireConsumer; +use bytes::{BufMut, BytesMut}; + +/// `StoreConsumerOffset` v2 request. +/// +/// Extends v1 with an `ack` field that lets the client pick the commit +/// guarantee: `NoAck` for leader-local (fast) writes, `Quorum` for +/// consensus-committed writes. +/// +/// Wire format: +/// ```text +/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4 LE][offset:8 LE][ack:1] +/// ``` +/// +/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes +/// for the u32 value (0 when None). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StoreConsumerOffset2Request { + pub consumer: WireConsumer, + pub stream_id: WireIdentifier, + pub topic_id: WireIdentifier, + pub partition_id: Option, + pub offset: u64, + pub ack: AckLevel, +} + +impl WireEncode for StoreConsumerOffset2Request { + fn encoded_size(&self) -> usize { + self.consumer.encoded_size() + + self.stream_id.encoded_size() + + self.topic_id.encoded_size() + + 1 + + 4 + + 8 + + 1 + } + + fn encode(&self, buf: &mut BytesMut) { + self.consumer.encode(buf); + self.stream_id.encode(buf); + self.topic_id.encode(buf); + if let Some(pid) = self.partition_id { + buf.put_u8(1); + buf.put_u32_le(pid); + } else { + buf.put_u8(0); + buf.put_u32_le(0); + } + buf.put_u64_le(self.offset); + buf.put_u8(self.ack.as_u8()); + } +} + +impl WireDecode for StoreConsumerOffset2Request { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let mut pos = 0; + let (consumer, n) = WireConsumer::decode(&buf[pos..])?; + pos += n; + let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let partition_flag = read_u8(buf, pos)?; + pos += 1; + let partition_raw = read_u32_le(buf, pos)?; + pos += 4; + let partition_id = if partition_flag == 1 { + Some(partition_raw) + } else { + None + }; + let offset = read_u64_le(buf, pos)?; + pos += 8; + let ack_code = read_u8(buf, pos)?; + pos += 1; + let ack = AckLevel::from_code(ack_code)?; + Ok(( + Self { + consumer, + stream_id, + topic_id, + partition_id, + offset, + ack, + }, + pos, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_with_partition_quorum() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(10), + topic_id: WireIdentifier::numeric(20), + partition_id: Some(5), + offset: 12345, + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = StoreConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_without_partition_no_ack() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: None, + offset: u64::MAX, + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = StoreConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_named_identifiers() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()), + stream_id: WireIdentifier::named("stream-1").unwrap(), + topic_id: WireIdentifier::named("topic-1").unwrap(), + partition_id: Some(0), + offset: 0, + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = StoreConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn ack_byte_is_last() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + offset: 0, + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8()); + } + + #[test] + fn unknown_ack_rejected() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + offset: 0, + ack: AckLevel::Quorum, + }; + let mut bytes = req.to_bytes().to_vec(); + let last = bytes.len() - 1; + bytes[last] = 0xFF; + assert!(StoreConsumerOffset2Request::decode(&bytes).is_err()); + } + + #[test] + fn truncated_returns_error() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(1), + offset: 100, + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + for i in 0..bytes.len() { + assert!( + StoreConsumerOffset2Request::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } +} diff --git a/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs new file mode 100644 index 0000000000..a145aab189 --- /dev/null +++ b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `DeleteConsumerOffset2` response is empty. +pub type DeleteConsumerOffset2Response = super::EmptyResponse; diff --git a/core/binary_protocol/src/responses/consumer_offsets/mod.rs b/core/binary_protocol/src/responses/consumer_offsets/mod.rs index 915648c18c..6dbaf454de 100644 --- a/core/binary_protocol/src/responses/consumer_offsets/mod.rs +++ b/core/binary_protocol/src/responses/consumer_offsets/mod.rs @@ -16,10 +16,14 @@ // under the License. mod delete_consumer_offset; +mod delete_consumer_offset_2; pub mod get_consumer_offset; mod store_consumer_offset; +mod store_consumer_offset_2; pub use super::EmptyResponse; pub use delete_consumer_offset::DeleteConsumerOffsetResponse; +pub use delete_consumer_offset_2::DeleteConsumerOffset2Response; pub use get_consumer_offset::ConsumerOffsetResponse; pub use store_consumer_offset::StoreConsumerOffsetResponse; +pub use store_consumer_offset_2::StoreConsumerOffset2Response; diff --git a/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs new file mode 100644 index 0000000000..0f76f603b8 --- /dev/null +++ b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `StoreConsumerOffset2` response is empty. +pub type StoreConsumerOffset2Response = super::EmptyResponse; diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index e25a997333..5c937f2c0e 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -32,6 +32,7 @@ pub mod wire_conversions; pub use error::client_error::ClientError; pub use error::iggy_error::{IggyError, IggyErrorDiscriminants}; +pub use iggy_binary_protocol::AckLevel; // Locking is feature gated, thus only mod level re-export. pub mod locking; pub use alloc::buffer::PooledBuffer; diff --git a/core/common/src/traits/binary_impls/consumer_offsets.rs b/core/common/src/traits/binary_impls/consumer_offsets.rs index af665a5133..0deeafa7cb 100644 --- a/core/common/src/traits/binary_impls/consumer_offsets.rs +++ b/core/common/src/traits/binary_impls/consumer_offsets.rs @@ -21,12 +21,15 @@ use crate::wire_conversions::{consumer_to_wire, identifier_to_wire}; use crate::{ BinaryClient, Consumer, ConsumerOffsetClient, ConsumerOffsetInfo, Identifier, IggyError, }; +use iggy_binary_protocol::AckLevel; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ - DELETE_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE, + DELETE_CONSUMER_OFFSET_2_CODE, DELETE_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET_CODE, + STORE_CONSUMER_OFFSET_2_CODE, STORE_CONSUMER_OFFSET_CODE, }; use iggy_binary_protocol::requests::consumer_offsets::{ - DeleteConsumerOffsetRequest, GetConsumerOffsetRequest, StoreConsumerOffsetRequest, + DeleteConsumerOffset2Request, DeleteConsumerOffsetRequest, GetConsumerOffsetRequest, + StoreConsumerOffset2Request, StoreConsumerOffsetRequest, }; use iggy_binary_protocol::responses::consumer_offsets::get_consumer_offset::ConsumerOffsetResponse; @@ -113,4 +116,60 @@ impl ConsumerOffsetClient for B { .await?; Ok(()) } + + async fn store_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + offset: u64, + ack: AckLevel, + ) -> Result<(), IggyError> { + fail_if_not_authenticated(self).await?; + let wire_consumer = consumer_to_wire(consumer)?; + let wire_stream_id = identifier_to_wire(stream_id)?; + let wire_topic_id = identifier_to_wire(topic_id)?; + self.send_raw_with_response( + STORE_CONSUMER_OFFSET_2_CODE, + StoreConsumerOffset2Request { + consumer: wire_consumer, + stream_id: wire_stream_id, + topic_id: wire_topic_id, + partition_id, + offset, + ack, + } + .to_bytes(), + ) + .await?; + Ok(()) + } + + async fn delete_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ack: AckLevel, + ) -> Result<(), IggyError> { + fail_if_not_authenticated(self).await?; + let wire_consumer = consumer_to_wire(consumer)?; + let wire_stream_id = identifier_to_wire(stream_id)?; + let wire_topic_id = identifier_to_wire(topic_id)?; + self.send_raw_with_response( + DELETE_CONSUMER_OFFSET_2_CODE, + DeleteConsumerOffset2Request { + consumer: wire_consumer, + stream_id: wire_stream_id, + topic_id: wire_topic_id, + partition_id, + ack, + } + .to_bytes(), + ) + .await?; + Ok(()) + } } diff --git a/core/common/src/traits/consumer_offset_client.rs b/core/common/src/traits/consumer_offset_client.rs index a3673ff3ad..06a615adfa 100644 --- a/core/common/src/traits/consumer_offset_client.rs +++ b/core/common/src/traits/consumer_offset_client.rs @@ -18,6 +18,7 @@ use crate::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; use async_trait::async_trait; +use iggy_binary_protocol::AckLevel; /// This trait defines the methods to interact with the consumer offset module. #[async_trait] @@ -53,4 +54,34 @@ pub trait ConsumerOffsetClient { topic_id: &Identifier, partition_id: Option, ) -> Result<(), IggyError>; + + /// Store the consumer offset with an explicit acknowledgement policy + /// (`AckLevel::NoAck` for leader-local, `AckLevel::Quorum` for + /// consensus-committed). The v1 `store_consumer_offset` is equivalent + /// to calling this with `AckLevel::Quorum`. + /// + /// Authentication is required, and the permission to poll the messages. + async fn store_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + offset: u64, + ack: AckLevel, + ) -> Result<(), IggyError>; + + /// Delete the consumer offset with an explicit acknowledgement policy. + /// The v1 `delete_consumer_offset` is equivalent to calling this with + /// `AckLevel::Quorum`. + /// + /// Authentication is required, and the permission to poll the messages. + async fn delete_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ack: AckLevel, + ) -> Result<(), IggyError>; } diff --git a/core/consensus/src/observability.rs b/core/consensus/src/observability.rs index bb042af108..2ac1d00447 100644 --- a/core/consensus/src/observability.rs +++ b/core/consensus/src/observability.rs @@ -644,6 +644,8 @@ pub const fn operation_as_str(operation: Operation) -> &'static str { Operation::SendMessages => "send_messages", Operation::StoreConsumerOffset => "store_consumer_offset", Operation::DeleteConsumerOffset => "delete_consumer_offset", + Operation::StoreConsumerOffset2 => "store_consumer_offset_2", + Operation::DeleteConsumerOffset2 => "delete_consumer_offset_2", } } diff --git a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs index 40b644c276..1a94451a0e 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs @@ -19,7 +19,7 @@ use crate::client_wrappers::client_wrapper::ClientWrapper; use async_trait::async_trait; use iggy_common::ConsumerOffsetClient; -use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; +use iggy_common::{AckLevel, Consumer, ConsumerOffsetInfo, Identifier, IggyError}; #[async_trait] impl ConsumerOffsetClient for ClientWrapper { @@ -131,4 +131,114 @@ impl ConsumerOffsetClient for ClientWrapper { } } } + + async fn store_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + offset: u64, + ack: AckLevel, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .store_consumer_offset_v2( + consumer, + stream_id, + topic_id, + partition_id, + offset, + ack, + ) + .await + } + ClientWrapper::Http(client) => { + client + .store_consumer_offset_v2( + consumer, + stream_id, + topic_id, + partition_id, + offset, + ack, + ) + .await + } + ClientWrapper::Tcp(client) => { + client + .store_consumer_offset_v2( + consumer, + stream_id, + topic_id, + partition_id, + offset, + ack, + ) + .await + } + ClientWrapper::Quic(client) => { + client + .store_consumer_offset_v2( + consumer, + stream_id, + topic_id, + partition_id, + offset, + ack, + ) + .await + } + ClientWrapper::WebSocket(client) => { + client + .store_consumer_offset_v2( + consumer, + stream_id, + topic_id, + partition_id, + offset, + ack, + ) + .await + } + } + } + + async fn delete_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ack: AckLevel, + ) -> Result<(), IggyError> { + match self { + ClientWrapper::Iggy(client) => { + client + .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) + .await + } + ClientWrapper::Http(client) => { + client + .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) + .await + } + ClientWrapper::Tcp(client) => { + client + .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) + .await + } + ClientWrapper::Quic(client) => { + client + .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) + .await + } + ClientWrapper::WebSocket(client) => { + client + .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) + .await + } + } + } } diff --git a/core/sdk/src/clients/binary_consumer_offset.rs b/core/sdk/src/clients/binary_consumer_offset.rs index 1f28f9292a..88a2371567 100644 --- a/core/sdk/src/clients/binary_consumer_offset.rs +++ b/core/sdk/src/clients/binary_consumer_offset.rs @@ -20,7 +20,7 @@ use crate::prelude::IggyClient; use async_trait::async_trait; use iggy_common::ConsumerOffsetClient; use iggy_common::locking::IggyRwLockFn; -use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; +use iggy_common::{AckLevel, Consumer, ConsumerOffsetInfo, Identifier, IggyError}; #[async_trait] impl ConsumerOffsetClient for IggyClient { @@ -66,4 +66,35 @@ impl ConsumerOffsetClient for IggyClient { .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } + + async fn store_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + offset: u64, + ack: AckLevel, + ) -> Result<(), IggyError> { + self.client + .read() + .await + .store_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, offset, ack) + .await + } + + async fn delete_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + ack: AckLevel, + ) -> Result<(), IggyError> { + self.client + .read() + .await + .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) + .await + } } diff --git a/core/sdk/src/http/consumer_offsets.rs b/core/sdk/src/http/consumer_offsets.rs index fe74355069..68fcd380a8 100644 --- a/core/sdk/src/http/consumer_offsets.rs +++ b/core/sdk/src/http/consumer_offsets.rs @@ -24,7 +24,7 @@ use async_trait::async_trait; use iggy_common::ConsumerOffsetClient; use iggy_common::get_consumer_offset::GetConsumerOffset; use iggy_common::store_consumer_offset::StoreConsumerOffset; -use iggy_common::{Consumer, ConsumerOffsetInfo}; +use iggy_common::{AckLevel, Consumer, ConsumerOffsetInfo}; #[async_trait] impl ConsumerOffsetClient for HttpClient { @@ -97,6 +97,34 @@ impl ConsumerOffsetClient for HttpClient { self.delete(&path).await?; Ok(()) } + + // HTTP has no wire-level ack parameter; v2 methods delegate to v1 regardless + // of the requested `ack`. The binary transport is the path that actually + // honours `AckLevel::NoAck`. + async fn store_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + offset: u64, + _ack: AckLevel, + ) -> Result<(), IggyError> { + self.store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) + .await + } + + async fn delete_consumer_offset_v2( + &self, + consumer: &Consumer, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: Option, + _ack: AckLevel, + ) -> Result<(), IggyError> { + self.delete_consumer_offset(consumer, stream_id, topic_id, partition_id) + .await + } } fn get_path(stream_id: &str, topic_id: &str) -> String { diff --git a/core/server/src/binary/dispatch.rs b/core/server/src/binary/dispatch.rs index 9138483e18..eed77cca2f 100644 --- a/core/server/src/binary/dispatch.rs +++ b/core/server/src/binary/dispatch.rs @@ -330,6 +330,20 @@ pub async fn dispatch( ) .await } + STORE_CONSUMER_OFFSET_2_CODE => { + let req: StoreConsumerOffset2Request = decode(frame.payload)?; + handlers::consumer_offsets::store_consumer_offset_2_handler::handle_store_consumer_offset_2( + req, sender, session, shard, + ) + .await + } + DELETE_CONSUMER_OFFSET_2_CODE => { + let req: DeleteConsumerOffset2Request = decode(frame.payload)?; + handlers::consumer_offsets::delete_consumer_offset_2_handler::handle_delete_consumer_offset_2( + req, sender, session, shard, + ) + .await + } // Consumer Groups GET_CONSUMER_GROUP_CODE => { diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs new file mode 100644 index 0000000000..9e8b8e6c5b --- /dev/null +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs @@ -0,0 +1,62 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::binary::dispatch::{HandlerResult, wire_consumer_to_consumer, wire_id_to_identifier}; +use crate::binary::handlers::consumer_offsets::COMPONENT; +use crate::shard::IggyShard; +use crate::streaming::session::Session; +use err_trail::ErrContext; +use iggy_binary_protocol::requests::consumer_offsets::DeleteConsumerOffset2Request; +use iggy_common::{IggyError, SenderKind}; +use std::rc::Rc; +use tracing::debug; + +pub async fn handle_delete_consumer_offset_2( + req: DeleteConsumerOffset2Request, + sender: &mut SenderKind, + session: &Session, + shard: &Rc, +) -> Result { + let consumer = wire_consumer_to_consumer(&req.consumer)?; + let stream_id = wire_id_to_identifier(&req.stream_id)?; + let topic_id = wire_id_to_identifier(&req.topic_id)?; + debug!( + "session: {session}, command: delete_consumer_offset_2, stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {:?}, ack: {:?}", + req.partition_id, req.ack + ); + shard.ensure_authenticated(session)?; + let topic = shard.resolve_topic_for_delete_consumer_offset( + session.get_user_id(), + &stream_id, + &topic_id, + )?; + shard + .delete_consumer_offset( + session.client_id, + consumer, + topic, + req.partition_id, + req.ack, + ) + .await + .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to delete consumer offset (v2) for topic with ID: {} in stream with ID: {} partition ID: {:#?}, ack: {:?}, session: {}", + topic_id, stream_id, req.partition_id, req.ack, session + ))?; + sender.send_empty_ok_response().await?; + Ok(HandlerResult::Finished) +} diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index 459198a06e..d0175cf3e9 100644 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -21,6 +21,7 @@ use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::shard::IggyShard; use crate::streaming::session::Session; use err_trail::ErrContext; +use iggy_binary_protocol::AckLevel; use iggy_binary_protocol::requests::consumer_offsets::DeleteConsumerOffsetRequest; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; @@ -46,7 +47,13 @@ pub async fn handle_delete_consumer_offset( &topic_id, )?; shard - .delete_consumer_offset(session.client_id, consumer, topic, req.partition_id) + .delete_consumer_offset( + session.client_id, + consumer, + topic, + req.partition_id, + AckLevel::Quorum, + ) .await .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to delete consumer offset for topic with ID: {} in stream with ID: {} partition ID: {:#?}, session: {}", topic_id, stream_id, req.partition_id, session diff --git a/core/server/src/binary/handlers/consumer_offsets/mod.rs b/core/server/src/binary/handlers/consumer_offsets/mod.rs index a7dcf63c9f..ccd182e523 100644 --- a/core/server/src/binary/handlers/consumer_offsets/mod.rs +++ b/core/server/src/binary/handlers/consumer_offsets/mod.rs @@ -16,8 +16,10 @@ * under the License. */ +pub mod delete_consumer_offset_2_handler; pub mod delete_consumer_offset_handler; pub mod get_consumer_offset_handler; +pub mod store_consumer_offset_2_handler; pub mod store_consumer_offset_handler; pub const COMPONENT: &str = "CONSUMER_OFFSET_HANDLER"; diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs new file mode 100644 index 0000000000..e895149bcb --- /dev/null +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs @@ -0,0 +1,65 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use std::rc::Rc; + +use crate::binary::dispatch::{HandlerResult, wire_consumer_to_consumer, wire_id_to_identifier}; +use crate::binary::handlers::consumer_offsets::COMPONENT; +use crate::shard::IggyShard; +use crate::streaming::session::Session; +use err_trail::ErrContext; +use iggy_binary_protocol::requests::consumer_offsets::StoreConsumerOffset2Request; +use iggy_common::IggyError; +use iggy_common::SenderKind; +use tracing::debug; + +pub async fn handle_store_consumer_offset_2( + req: StoreConsumerOffset2Request, + sender: &mut SenderKind, + session: &Session, + shard: &Rc, +) -> Result { + let consumer = wire_consumer_to_consumer(&req.consumer)?; + let stream_id = wire_id_to_identifier(&req.stream_id)?; + let topic_id = wire_id_to_identifier(&req.topic_id)?; + debug!( + "session: {session}, command: store_consumer_offset_2, stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {:?}, offset: {}, ack: {:?}", + req.partition_id, req.offset, req.ack + ); + shard.ensure_authenticated(session)?; + let topic = shard.resolve_topic_for_store_consumer_offset( + session.get_user_id(), + &stream_id, + &topic_id, + )?; + shard + .store_consumer_offset( + session.client_id, + consumer, + topic, + req.partition_id, + req.offset, + req.ack, + ) + .await + .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to store consumer offset (v2) for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, ack: {:?}, session: {}", + stream_id, topic_id, req.partition_id, req.offset, req.ack, session + ))?; + sender.send_empty_ok_response().await?; + Ok(HandlerResult::Finished) +} diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index 87eb4c5e5b..33fc877e03 100644 --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@ -23,6 +23,7 @@ use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::shard::IggyShard; use crate::streaming::session::Session; use err_trail::ErrContext; +use iggy_binary_protocol::AckLevel; use iggy_binary_protocol::requests::consumer_offsets::StoreConsumerOffsetRequest; use iggy_common::IggyError; use iggy_common::SenderKind; @@ -54,6 +55,7 @@ pub async fn handle_store_consumer_offset( topic, req.partition_id, req.offset, + AckLevel::Quorum, ) .await .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to store consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, session: {}", diff --git a/core/server/src/http/http_shard_wrapper.rs b/core/server/src/http/http_shard_wrapper.rs index 9952e4de78..f3ab203c0e 100644 --- a/core/server/src/http/http_shard_wrapper.rs +++ b/core/server/src/http/http_shard_wrapper.rs @@ -17,6 +17,7 @@ use std::rc::Rc; +use iggy_binary_protocol::AckLevel; use iggy_common::{ Consumer, ConsumerOffsetInfo, Identifier, IggyError, Partitioning, PartitioningKind, }; @@ -110,6 +111,7 @@ impl HttpSafeShard { topic, partition_id, offset, + AckLevel::Quorum, )); let _result = future.await?; Ok(()) @@ -129,6 +131,7 @@ impl HttpSafeShard { consumer, topic, partition_id, + AckLevel::Quorum, )); let _result = future.await?; Ok(()) diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 5d4de789c6..94a9ad2118 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -31,6 +31,7 @@ use crate::{ }, }; use compio::net::TcpStream; +use iggy_binary_protocol::AckLevel; use iggy_common::{IggyError, SenderKind, TransportProtocol, sharding::IggyNamespace}; use nix::sys::stat::SFlag; use std::os::fd::{FromRawFd, IntoRawFd}; @@ -106,7 +107,14 @@ async fn handle_request( .last_offset() .expect("Batch set should have at least one batch"); shard - .auto_commit_consumer_offset_from_local_partition(&namespace, consumer, offset) + .store_consumer_offset_internal( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + &consumer, + offset, + AckLevel::NoAck, + ) .await?; } Ok(ShardResponse::PollMessages((poll_metadata, batches))) diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index ab287a8344..30e8d39bbe 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -26,6 +26,7 @@ use crate::{ }, }; use err_trail::ErrContext; +use iggy_binary_protocol::AckLevel; use iggy_common::{ Consumer, ConsumerKind, ConsumerOffsetInfo, Identifier, IggyError, sharding::IggyNamespace, }; @@ -39,6 +40,7 @@ impl IggyShard { topic: ResolvedTopic, partition_id: Option, offset: u64, + ack: AckLevel, ) -> Result<(PollingConsumer, usize), IggyError> { let Some((polling_consumer, partition_id)) = self.resolve_consumer_with_partition_id( topic, @@ -51,32 +53,53 @@ impl IggyShard { return Err(IggyError::NotResolvedConsumer(consumer.id)); }; - self.validate_partition_offset(topic.stream_id, topic.topic_id, partition_id, offset)?; - - self.store_consumer_offset_base( + self.store_consumer_offset_internal( topic.stream_id, topic.topic_id, - &polling_consumer, partition_id, - offset, - ); - self.persist_consumer_offset_to_disk( - topic.stream_id, - topic.topic_id, &polling_consumer, - partition_id, + offset, + ack, ) .await?; - self.maybe_complete_pending_revocation( - &polling_consumer, - topic.stream_id, - topic.topic_id, + Ok((polling_consumer, partition_id)) + } + + /// Shared offset-write path used by `store_consumer_offset` (explicit + /// client writes) and `PollMessages` auto-commit. Resolution and + /// permission checks are the caller's responsibility. + /// + /// `Quorum` performs offset validation; `NoAck` skips it to match the + /// trust model of auto-commit (the offset was just polled from the + /// partition, so it is known-good by construction). + pub(crate) async fn store_consumer_offset_internal( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + polling_consumer: &PollingConsumer, + offset: u64, + ack: AckLevel, + ) -> Result<(), IggyError> { + if matches!(ack, AckLevel::Quorum) { + self.validate_partition_offset(stream_id, topic_id, partition_id, offset)?; + } + + self.store_consumer_offset_base( + stream_id, + topic_id, + polling_consumer, partition_id, - ) - .await; + offset, + ); + self.persist_consumer_offset_to_disk(stream_id, topic_id, polling_consumer, partition_id) + .await?; - Ok((polling_consumer, partition_id)) + self.maybe_complete_pending_revocation(polling_consumer, stream_id, topic_id, partition_id) + .await; + + Ok(()) } pub async fn get_consumer_offset( @@ -180,6 +203,7 @@ impl IggyShard { consumer: Consumer, topic: ResolvedTopic, partition_id: Option, + _ack: AckLevel, ) -> Result<(PollingConsumer, usize), IggyError> { let Some((polling_consumer, partition_id)) = self.resolve_consumer_with_partition_id( topic, diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 82a5d25b06..647279a981 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -29,9 +29,7 @@ use err_trail::ErrContext; use iggy_common::IggyPollMetadata; use iggy_common::PooledBuffer; use iggy_common::sharding::IggyNamespace; -use iggy_common::{ - Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, PollingStrategy, -}; +use iggy_common::{Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, IggyError, PollingStrategy}; use std::sync::atomic::Ordering; use tracing::error; @@ -228,83 +226,6 @@ impl IggyShard { Ok(()) } - pub(crate) async fn auto_commit_consumer_offset_from_local_partition( - &self, - namespace: &IggyNamespace, - consumer: PollingConsumer, - offset: u64, - ) -> Result<(), IggyError> { - let (offset_value, path) = { - let partitions = self.local_partitions.borrow(); - let partition = partitions.get(namespace).ok_or_else(|| { - IggyError::PartitionNotFound( - namespace.partition_id(), - Identifier::numeric(namespace.topic_id() as u32).unwrap(), - Identifier::numeric(namespace.stream_id() as u32).unwrap(), - ) - })?; - - match consumer { - PollingConsumer::Consumer(consumer_id, _) => { - tracing::trace!( - "Auto-committing offset {} for consumer {} on partition {:?}", - offset, - consumer_id, - namespace - ); - let hdl = partition.consumer_offsets.pin(); - let item = hdl.get_or_insert( - consumer_id, - crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer( - consumer_id as u32, - &self.config.system.get_consumer_offsets_path( - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - ), - ), - ); - item.offset.store(offset, Ordering::Release); - (item.offset.load(Ordering::Relaxed), item.path.clone()) - } - PollingConsumer::ConsumerGroup(consumer_group_id, _) => { - tracing::trace!( - "Auto-committing offset {} for consumer group {} on partition {:?}", - offset, - consumer_group_id.0, - namespace - ); - let hdl = partition.consumer_group_offsets.pin(); - let item = hdl.get_or_insert( - consumer_group_id, - crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group( - consumer_group_id, - &self.config.system.get_consumer_group_offsets_path( - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - ), - ), - ); - item.offset.store(offset, Ordering::Release); - (item.offset.load(Ordering::Relaxed), item.path.clone()) - } - } - }; - - crate::streaming::partitions::storage::persist_offset(&path, offset_value).await?; - - self.maybe_complete_pending_revocation( - &consumer, - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - ) - .await; - - Ok(()) - } - /// Appends a batch to the active segment, flushing to disk and rotating if needed. /// /// Safety: called exclusively from the message pump — segment indices captured before From b0ff05a7102456994a807383fcc40efe6651e7a9 Mon Sep 17 00:00:00 2001 From: Krishna Vishal Date: Thu, 23 Apr 2026 12:50:40 +0530 Subject: [PATCH 2/4] fix: address review comments --- core/common/src/lib.rs | 1 - .../traits/binary_impls/consumer_offsets.rs | 63 +--------- .../src/traits/consumer_offset_client.rs | 31 ----- core/partitions/src/iggy_partition.rs | 27 +++-- .../binary_consumer_offset_client.rs | 112 +----------------- .../sdk/src/clients/binary_consumer_offset.rs | 33 +----- core/sdk/src/http/consumer_offsets.rs | 30 +---- core/server/src/binary/dispatch.rs | 14 --- .../delete_consumer_offset_2_handler.rs | 62 ---------- .../delete_consumer_offset_handler.rs | 9 +- .../binary/handlers/consumer_offsets/mod.rs | 2 - .../store_consumer_offset_2_handler.rs | 65 ---------- .../store_consumer_offset_handler.rs | 2 - core/server/src/http/http_shard_wrapper.rs | 3 - core/server/src/shard/handlers.rs | 10 +- .../src/shard/system/consumer_offsets.rs | 58 +++------ core/server/src/shard/system/messages.rs | 81 ++++++++++++- core/simulator/src/client.rs | 40 ++++++- 18 files changed, 162 insertions(+), 481 deletions(-) delete mode 100644 core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs delete mode 100644 core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 5c937f2c0e..e25a997333 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -32,7 +32,6 @@ pub mod wire_conversions; pub use error::client_error::ClientError; pub use error::iggy_error::{IggyError, IggyErrorDiscriminants}; -pub use iggy_binary_protocol::AckLevel; // Locking is feature gated, thus only mod level re-export. pub mod locking; pub use alloc::buffer::PooledBuffer; diff --git a/core/common/src/traits/binary_impls/consumer_offsets.rs b/core/common/src/traits/binary_impls/consumer_offsets.rs index 0deeafa7cb..af665a5133 100644 --- a/core/common/src/traits/binary_impls/consumer_offsets.rs +++ b/core/common/src/traits/binary_impls/consumer_offsets.rs @@ -21,15 +21,12 @@ use crate::wire_conversions::{consumer_to_wire, identifier_to_wire}; use crate::{ BinaryClient, Consumer, ConsumerOffsetClient, ConsumerOffsetInfo, Identifier, IggyError, }; -use iggy_binary_protocol::AckLevel; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ - DELETE_CONSUMER_OFFSET_2_CODE, DELETE_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET_CODE, - STORE_CONSUMER_OFFSET_2_CODE, STORE_CONSUMER_OFFSET_CODE, + DELETE_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE, }; use iggy_binary_protocol::requests::consumer_offsets::{ - DeleteConsumerOffset2Request, DeleteConsumerOffsetRequest, GetConsumerOffsetRequest, - StoreConsumerOffset2Request, StoreConsumerOffsetRequest, + DeleteConsumerOffsetRequest, GetConsumerOffsetRequest, StoreConsumerOffsetRequest, }; use iggy_binary_protocol::responses::consumer_offsets::get_consumer_offset::ConsumerOffsetResponse; @@ -116,60 +113,4 @@ impl ConsumerOffsetClient for B { .await?; Ok(()) } - - async fn store_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - offset: u64, - ack: AckLevel, - ) -> Result<(), IggyError> { - fail_if_not_authenticated(self).await?; - let wire_consumer = consumer_to_wire(consumer)?; - let wire_stream_id = identifier_to_wire(stream_id)?; - let wire_topic_id = identifier_to_wire(topic_id)?; - self.send_raw_with_response( - STORE_CONSUMER_OFFSET_2_CODE, - StoreConsumerOffset2Request { - consumer: wire_consumer, - stream_id: wire_stream_id, - topic_id: wire_topic_id, - partition_id, - offset, - ack, - } - .to_bytes(), - ) - .await?; - Ok(()) - } - - async fn delete_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - ack: AckLevel, - ) -> Result<(), IggyError> { - fail_if_not_authenticated(self).await?; - let wire_consumer = consumer_to_wire(consumer)?; - let wire_stream_id = identifier_to_wire(stream_id)?; - let wire_topic_id = identifier_to_wire(topic_id)?; - self.send_raw_with_response( - DELETE_CONSUMER_OFFSET_2_CODE, - DeleteConsumerOffset2Request { - consumer: wire_consumer, - stream_id: wire_stream_id, - topic_id: wire_topic_id, - partition_id, - ack, - } - .to_bytes(), - ) - .await?; - Ok(()) - } } diff --git a/core/common/src/traits/consumer_offset_client.rs b/core/common/src/traits/consumer_offset_client.rs index 06a615adfa..a3673ff3ad 100644 --- a/core/common/src/traits/consumer_offset_client.rs +++ b/core/common/src/traits/consumer_offset_client.rs @@ -18,7 +18,6 @@ use crate::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; use async_trait::async_trait; -use iggy_binary_protocol::AckLevel; /// This trait defines the methods to interact with the consumer offset module. #[async_trait] @@ -54,34 +53,4 @@ pub trait ConsumerOffsetClient { topic_id: &Identifier, partition_id: Option, ) -> Result<(), IggyError>; - - /// Store the consumer offset with an explicit acknowledgement policy - /// (`AckLevel::NoAck` for leader-local, `AckLevel::Quorum` for - /// consensus-committed). The v1 `store_consumer_offset` is equivalent - /// to calling this with `AckLevel::Quorum`. - /// - /// Authentication is required, and the permission to poll the messages. - async fn store_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - offset: u64, - ack: AckLevel, - ) -> Result<(), IggyError>; - - /// Delete the consumer offset with an explicit acknowledgement policy. - /// The v1 `delete_consumer_offset` is equivalent to calling this with - /// `AckLevel::Quorum`. - /// - /// Authentication is required, and the permission to poll the messages. - async fn delete_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - ack: AckLevel, - ) -> Result<(), IggyError>; } diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 37e99679e7..ac0312724b 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -667,7 +667,10 @@ where message }; - if message.header().operation == Operation::DeleteConsumerOffset { + if matches!( + message.header().operation, + Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 + ) { match Self::parse_consumer_offset_request(message.header().operation, &message) .and_then(|(kind, consumer_id, _)| { self.ensure_consumer_offset_exists(kind, consumer_id) @@ -680,7 +683,7 @@ where ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), "rejecting delete_consumer_offset for missing offset", ) - .with_operation(Operation::DeleteConsumerOffset) + .with_operation(message.header().operation) .with_error(error.to_string()), ); return; @@ -972,7 +975,10 @@ where ); Ok(()) } - Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { + Operation::StoreConsumerOffset + | Operation::DeleteConsumerOffset + | Operation::StoreConsumerOffset2 + | Operation::DeleteConsumerOffset2 => { let (kind, consumer_id, offset) = Self::parse_staged_consumer_offset_commit(header.operation, &message)?; let write_lock = self.write_lock.clone(); @@ -995,7 +1001,7 @@ where .map_err(|_| IggyError::CannotAppendMessage)?; match header.operation { - Operation::StoreConsumerOffset => { + Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 => { self.stage_consumer_offset_upsert( header.op, kind, @@ -1003,7 +1009,7 @@ where offset.expect("store_consumer_offset must include offset"), ); } - Operation::DeleteConsumerOffset => { + Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 => { self.stage_consumer_offset_delete(header.op, kind, consumer_id)?; } _ => unreachable!(), @@ -1331,7 +1337,10 @@ where } !*failed_commit } - Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { + Operation::StoreConsumerOffset + | Operation::DeleteConsumerOffset + | Operation::StoreConsumerOffset2 + | Operation::DeleteConsumerOffset2 => { self.commit_consumer_offset_entry(prepare_header, failed_commit) .await } @@ -1410,7 +1419,7 @@ where })?; let kind = ConsumerKind::from_code(consumer_kind)?; match operation { - Operation::StoreConsumerOffset => { + Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 => { let offset = body.get(5..13) .ok_or(IggyError::InvalidCommand) @@ -1421,7 +1430,9 @@ where })?; Ok((kind, consumer_id, Some(offset))) } - Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)), + Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 => { + Ok((kind, consumer_id, None)) + } _ => Err(IggyError::InvalidCommand), } } diff --git a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs index 1a94451a0e..40b644c276 100644 --- a/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs +++ b/core/sdk/src/client_wrappers/binary_consumer_offset_client.rs @@ -19,7 +19,7 @@ use crate::client_wrappers::client_wrapper::ClientWrapper; use async_trait::async_trait; use iggy_common::ConsumerOffsetClient; -use iggy_common::{AckLevel, Consumer, ConsumerOffsetInfo, Identifier, IggyError}; +use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; #[async_trait] impl ConsumerOffsetClient for ClientWrapper { @@ -131,114 +131,4 @@ impl ConsumerOffsetClient for ClientWrapper { } } } - - async fn store_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - offset: u64, - ack: AckLevel, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .store_consumer_offset_v2( - consumer, - stream_id, - topic_id, - partition_id, - offset, - ack, - ) - .await - } - ClientWrapper::Http(client) => { - client - .store_consumer_offset_v2( - consumer, - stream_id, - topic_id, - partition_id, - offset, - ack, - ) - .await - } - ClientWrapper::Tcp(client) => { - client - .store_consumer_offset_v2( - consumer, - stream_id, - topic_id, - partition_id, - offset, - ack, - ) - .await - } - ClientWrapper::Quic(client) => { - client - .store_consumer_offset_v2( - consumer, - stream_id, - topic_id, - partition_id, - offset, - ack, - ) - .await - } - ClientWrapper::WebSocket(client) => { - client - .store_consumer_offset_v2( - consumer, - stream_id, - topic_id, - partition_id, - offset, - ack, - ) - .await - } - } - } - - async fn delete_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - ack: AckLevel, - ) -> Result<(), IggyError> { - match self { - ClientWrapper::Iggy(client) => { - client - .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) - .await - } - ClientWrapper::Http(client) => { - client - .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) - .await - } - ClientWrapper::Tcp(client) => { - client - .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) - .await - } - ClientWrapper::Quic(client) => { - client - .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) - .await - } - ClientWrapper::WebSocket(client) => { - client - .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) - .await - } - } - } } diff --git a/core/sdk/src/clients/binary_consumer_offset.rs b/core/sdk/src/clients/binary_consumer_offset.rs index 88a2371567..1f28f9292a 100644 --- a/core/sdk/src/clients/binary_consumer_offset.rs +++ b/core/sdk/src/clients/binary_consumer_offset.rs @@ -20,7 +20,7 @@ use crate::prelude::IggyClient; use async_trait::async_trait; use iggy_common::ConsumerOffsetClient; use iggy_common::locking::IggyRwLockFn; -use iggy_common::{AckLevel, Consumer, ConsumerOffsetInfo, Identifier, IggyError}; +use iggy_common::{Consumer, ConsumerOffsetInfo, Identifier, IggyError}; #[async_trait] impl ConsumerOffsetClient for IggyClient { @@ -66,35 +66,4 @@ impl ConsumerOffsetClient for IggyClient { .delete_consumer_offset(consumer, stream_id, topic_id, partition_id) .await } - - async fn store_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - offset: u64, - ack: AckLevel, - ) -> Result<(), IggyError> { - self.client - .read() - .await - .store_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, offset, ack) - .await - } - - async fn delete_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - ack: AckLevel, - ) -> Result<(), IggyError> { - self.client - .read() - .await - .delete_consumer_offset_v2(consumer, stream_id, topic_id, partition_id, ack) - .await - } } diff --git a/core/sdk/src/http/consumer_offsets.rs b/core/sdk/src/http/consumer_offsets.rs index 68fcd380a8..fe74355069 100644 --- a/core/sdk/src/http/consumer_offsets.rs +++ b/core/sdk/src/http/consumer_offsets.rs @@ -24,7 +24,7 @@ use async_trait::async_trait; use iggy_common::ConsumerOffsetClient; use iggy_common::get_consumer_offset::GetConsumerOffset; use iggy_common::store_consumer_offset::StoreConsumerOffset; -use iggy_common::{AckLevel, Consumer, ConsumerOffsetInfo}; +use iggy_common::{Consumer, ConsumerOffsetInfo}; #[async_trait] impl ConsumerOffsetClient for HttpClient { @@ -97,34 +97,6 @@ impl ConsumerOffsetClient for HttpClient { self.delete(&path).await?; Ok(()) } - - // HTTP has no wire-level ack parameter; v2 methods delegate to v1 regardless - // of the requested `ack`. The binary transport is the path that actually - // honours `AckLevel::NoAck`. - async fn store_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - offset: u64, - _ack: AckLevel, - ) -> Result<(), IggyError> { - self.store_consumer_offset(consumer, stream_id, topic_id, partition_id, offset) - .await - } - - async fn delete_consumer_offset_v2( - &self, - consumer: &Consumer, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option, - _ack: AckLevel, - ) -> Result<(), IggyError> { - self.delete_consumer_offset(consumer, stream_id, topic_id, partition_id) - .await - } } fn get_path(stream_id: &str, topic_id: &str) -> String { diff --git a/core/server/src/binary/dispatch.rs b/core/server/src/binary/dispatch.rs index eed77cca2f..9138483e18 100644 --- a/core/server/src/binary/dispatch.rs +++ b/core/server/src/binary/dispatch.rs @@ -330,20 +330,6 @@ pub async fn dispatch( ) .await } - STORE_CONSUMER_OFFSET_2_CODE => { - let req: StoreConsumerOffset2Request = decode(frame.payload)?; - handlers::consumer_offsets::store_consumer_offset_2_handler::handle_store_consumer_offset_2( - req, sender, session, shard, - ) - .await - } - DELETE_CONSUMER_OFFSET_2_CODE => { - let req: DeleteConsumerOffset2Request = decode(frame.payload)?; - handlers::consumer_offsets::delete_consumer_offset_2_handler::handle_delete_consumer_offset_2( - req, sender, session, shard, - ) - .await - } // Consumer Groups GET_CONSUMER_GROUP_CODE => { diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs deleted file mode 100644 index 9e8b8e6c5b..0000000000 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_2_handler.rs +++ /dev/null @@ -1,62 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use crate::binary::dispatch::{HandlerResult, wire_consumer_to_consumer, wire_id_to_identifier}; -use crate::binary::handlers::consumer_offsets::COMPONENT; -use crate::shard::IggyShard; -use crate::streaming::session::Session; -use err_trail::ErrContext; -use iggy_binary_protocol::requests::consumer_offsets::DeleteConsumerOffset2Request; -use iggy_common::{IggyError, SenderKind}; -use std::rc::Rc; -use tracing::debug; - -pub async fn handle_delete_consumer_offset_2( - req: DeleteConsumerOffset2Request, - sender: &mut SenderKind, - session: &Session, - shard: &Rc, -) -> Result { - let consumer = wire_consumer_to_consumer(&req.consumer)?; - let stream_id = wire_id_to_identifier(&req.stream_id)?; - let topic_id = wire_id_to_identifier(&req.topic_id)?; - debug!( - "session: {session}, command: delete_consumer_offset_2, stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {:?}, ack: {:?}", - req.partition_id, req.ack - ); - shard.ensure_authenticated(session)?; - let topic = shard.resolve_topic_for_delete_consumer_offset( - session.get_user_id(), - &stream_id, - &topic_id, - )?; - shard - .delete_consumer_offset( - session.client_id, - consumer, - topic, - req.partition_id, - req.ack, - ) - .await - .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to delete consumer offset (v2) for topic with ID: {} in stream with ID: {} partition ID: {:#?}, ack: {:?}, session: {}", - topic_id, stream_id, req.partition_id, req.ack, session - ))?; - sender.send_empty_ok_response().await?; - Ok(HandlerResult::Finished) -} diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index d0175cf3e9..459198a06e 100644 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -21,7 +21,6 @@ use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::shard::IggyShard; use crate::streaming::session::Session; use err_trail::ErrContext; -use iggy_binary_protocol::AckLevel; use iggy_binary_protocol::requests::consumer_offsets::DeleteConsumerOffsetRequest; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; @@ -47,13 +46,7 @@ pub async fn handle_delete_consumer_offset( &topic_id, )?; shard - .delete_consumer_offset( - session.client_id, - consumer, - topic, - req.partition_id, - AckLevel::Quorum, - ) + .delete_consumer_offset(session.client_id, consumer, topic, req.partition_id) .await .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to delete consumer offset for topic with ID: {} in stream with ID: {} partition ID: {:#?}, session: {}", topic_id, stream_id, req.partition_id, session diff --git a/core/server/src/binary/handlers/consumer_offsets/mod.rs b/core/server/src/binary/handlers/consumer_offsets/mod.rs index ccd182e523..a7dcf63c9f 100644 --- a/core/server/src/binary/handlers/consumer_offsets/mod.rs +++ b/core/server/src/binary/handlers/consumer_offsets/mod.rs @@ -16,10 +16,8 @@ * under the License. */ -pub mod delete_consumer_offset_2_handler; pub mod delete_consumer_offset_handler; pub mod get_consumer_offset_handler; -pub mod store_consumer_offset_2_handler; pub mod store_consumer_offset_handler; pub const COMPONENT: &str = "CONSUMER_OFFSET_HANDLER"; diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs deleted file mode 100644 index e895149bcb..0000000000 --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_2_handler.rs +++ /dev/null @@ -1,65 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use std::rc::Rc; - -use crate::binary::dispatch::{HandlerResult, wire_consumer_to_consumer, wire_id_to_identifier}; -use crate::binary::handlers::consumer_offsets::COMPONENT; -use crate::shard::IggyShard; -use crate::streaming::session::Session; -use err_trail::ErrContext; -use iggy_binary_protocol::requests::consumer_offsets::StoreConsumerOffset2Request; -use iggy_common::IggyError; -use iggy_common::SenderKind; -use tracing::debug; - -pub async fn handle_store_consumer_offset_2( - req: StoreConsumerOffset2Request, - sender: &mut SenderKind, - session: &Session, - shard: &Rc, -) -> Result { - let consumer = wire_consumer_to_consumer(&req.consumer)?; - let stream_id = wire_id_to_identifier(&req.stream_id)?; - let topic_id = wire_id_to_identifier(&req.topic_id)?; - debug!( - "session: {session}, command: store_consumer_offset_2, stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {:?}, offset: {}, ack: {:?}", - req.partition_id, req.offset, req.ack - ); - shard.ensure_authenticated(session)?; - let topic = shard.resolve_topic_for_store_consumer_offset( - session.get_user_id(), - &stream_id, - &topic_id, - )?; - shard - .store_consumer_offset( - session.client_id, - consumer, - topic, - req.partition_id, - req.offset, - req.ack, - ) - .await - .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to store consumer offset (v2) for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, ack: {:?}, session: {}", - stream_id, topic_id, req.partition_id, req.offset, req.ack, session - ))?; - sender.send_empty_ok_response().await?; - Ok(HandlerResult::Finished) -} diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index 33fc877e03..87eb4c5e5b 100644 --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@ -23,7 +23,6 @@ use crate::binary::handlers::consumer_offsets::COMPONENT; use crate::shard::IggyShard; use crate::streaming::session::Session; use err_trail::ErrContext; -use iggy_binary_protocol::AckLevel; use iggy_binary_protocol::requests::consumer_offsets::StoreConsumerOffsetRequest; use iggy_common::IggyError; use iggy_common::SenderKind; @@ -55,7 +54,6 @@ pub async fn handle_store_consumer_offset( topic, req.partition_id, req.offset, - AckLevel::Quorum, ) .await .error(|e: &IggyError| format!("{COMPONENT} (error: {e}) - failed to store consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, session: {}", diff --git a/core/server/src/http/http_shard_wrapper.rs b/core/server/src/http/http_shard_wrapper.rs index f3ab203c0e..9952e4de78 100644 --- a/core/server/src/http/http_shard_wrapper.rs +++ b/core/server/src/http/http_shard_wrapper.rs @@ -17,7 +17,6 @@ use std::rc::Rc; -use iggy_binary_protocol::AckLevel; use iggy_common::{ Consumer, ConsumerOffsetInfo, Identifier, IggyError, Partitioning, PartitioningKind, }; @@ -111,7 +110,6 @@ impl HttpSafeShard { topic, partition_id, offset, - AckLevel::Quorum, )); let _result = future.await?; Ok(()) @@ -131,7 +129,6 @@ impl HttpSafeShard { consumer, topic, partition_id, - AckLevel::Quorum, )); let _result = future.await?; Ok(()) diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 94a9ad2118..5d4de789c6 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -31,7 +31,6 @@ use crate::{ }, }; use compio::net::TcpStream; -use iggy_binary_protocol::AckLevel; use iggy_common::{IggyError, SenderKind, TransportProtocol, sharding::IggyNamespace}; use nix::sys::stat::SFlag; use std::os::fd::{FromRawFd, IntoRawFd}; @@ -107,14 +106,7 @@ async fn handle_request( .last_offset() .expect("Batch set should have at least one batch"); shard - .store_consumer_offset_internal( - namespace.stream_id(), - namespace.topic_id(), - namespace.partition_id(), - &consumer, - offset, - AckLevel::NoAck, - ) + .auto_commit_consumer_offset_from_local_partition(&namespace, consumer, offset) .await?; } Ok(ShardResponse::PollMessages((poll_metadata, batches))) diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index 30e8d39bbe..ab287a8344 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -26,7 +26,6 @@ use crate::{ }, }; use err_trail::ErrContext; -use iggy_binary_protocol::AckLevel; use iggy_common::{ Consumer, ConsumerKind, ConsumerOffsetInfo, Identifier, IggyError, sharding::IggyNamespace, }; @@ -40,7 +39,6 @@ impl IggyShard { topic: ResolvedTopic, partition_id: Option, offset: u64, - ack: AckLevel, ) -> Result<(PollingConsumer, usize), IggyError> { let Some((polling_consumer, partition_id)) = self.resolve_consumer_with_partition_id( topic, @@ -53,53 +51,32 @@ impl IggyShard { return Err(IggyError::NotResolvedConsumer(consumer.id)); }; - self.store_consumer_offset_internal( + self.validate_partition_offset(topic.stream_id, topic.topic_id, partition_id, offset)?; + + self.store_consumer_offset_base( topic.stream_id, topic.topic_id, - partition_id, &polling_consumer, + partition_id, offset, - ack, + ); + self.persist_consumer_offset_to_disk( + topic.stream_id, + topic.topic_id, + &polling_consumer, + partition_id, ) .await?; - Ok((polling_consumer, partition_id)) - } - - /// Shared offset-write path used by `store_consumer_offset` (explicit - /// client writes) and `PollMessages` auto-commit. Resolution and - /// permission checks are the caller's responsibility. - /// - /// `Quorum` performs offset validation; `NoAck` skips it to match the - /// trust model of auto-commit (the offset was just polled from the - /// partition, so it is known-good by construction). - pub(crate) async fn store_consumer_offset_internal( - &self, - stream_id: usize, - topic_id: usize, - partition_id: usize, - polling_consumer: &PollingConsumer, - offset: u64, - ack: AckLevel, - ) -> Result<(), IggyError> { - if matches!(ack, AckLevel::Quorum) { - self.validate_partition_offset(stream_id, topic_id, partition_id, offset)?; - } - - self.store_consumer_offset_base( - stream_id, - topic_id, - polling_consumer, + self.maybe_complete_pending_revocation( + &polling_consumer, + topic.stream_id, + topic.topic_id, partition_id, - offset, - ); - self.persist_consumer_offset_to_disk(stream_id, topic_id, polling_consumer, partition_id) - .await?; - - self.maybe_complete_pending_revocation(polling_consumer, stream_id, topic_id, partition_id) - .await; + ) + .await; - Ok(()) + Ok((polling_consumer, partition_id)) } pub async fn get_consumer_offset( @@ -203,7 +180,6 @@ impl IggyShard { consumer: Consumer, topic: ResolvedTopic, partition_id: Option, - _ack: AckLevel, ) -> Result<(PollingConsumer, usize), IggyError> { let Some((polling_consumer, partition_id)) = self.resolve_consumer_with_partition_id( topic, diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 647279a981..82a5d25b06 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -29,7 +29,9 @@ use err_trail::ErrContext; use iggy_common::IggyPollMetadata; use iggy_common::PooledBuffer; use iggy_common::sharding::IggyNamespace; -use iggy_common::{Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, IggyError, PollingStrategy}; +use iggy_common::{ + Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, PollingStrategy, +}; use std::sync::atomic::Ordering; use tracing::error; @@ -226,6 +228,83 @@ impl IggyShard { Ok(()) } + pub(crate) async fn auto_commit_consumer_offset_from_local_partition( + &self, + namespace: &IggyNamespace, + consumer: PollingConsumer, + offset: u64, + ) -> Result<(), IggyError> { + let (offset_value, path) = { + let partitions = self.local_partitions.borrow(); + let partition = partitions.get(namespace).ok_or_else(|| { + IggyError::PartitionNotFound( + namespace.partition_id(), + Identifier::numeric(namespace.topic_id() as u32).unwrap(), + Identifier::numeric(namespace.stream_id() as u32).unwrap(), + ) + })?; + + match consumer { + PollingConsumer::Consumer(consumer_id, _) => { + tracing::trace!( + "Auto-committing offset {} for consumer {} on partition {:?}", + offset, + consumer_id, + namespace + ); + let hdl = partition.consumer_offsets.pin(); + let item = hdl.get_or_insert( + consumer_id, + crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer( + consumer_id as u32, + &self.config.system.get_consumer_offsets_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ), + ), + ); + item.offset.store(offset, Ordering::Release); + (item.offset.load(Ordering::Relaxed), item.path.clone()) + } + PollingConsumer::ConsumerGroup(consumer_group_id, _) => { + tracing::trace!( + "Auto-committing offset {} for consumer group {} on partition {:?}", + offset, + consumer_group_id.0, + namespace + ); + let hdl = partition.consumer_group_offsets.pin(); + let item = hdl.get_or_insert( + consumer_group_id, + crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group( + consumer_group_id, + &self.config.system.get_consumer_group_offsets_path( + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ), + ), + ); + item.offset.store(offset, Ordering::Release); + (item.offset.load(Ordering::Relaxed), item.path.clone()) + } + } + }; + + crate::streaming::partitions::storage::persist_offset(&path, offset_value).await?; + + self.maybe_complete_pending_revocation( + &consumer, + namespace.stream_id(), + namespace.topic_id(), + namespace.partition_id(), + ) + .await; + + Ok(()) + } + /// Appends a batch to the active segment, flushing to disk and rotating if needed. /// /// Safety: called exclusively from the message pump — segment indices captured before diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 96a20fa928..88940fbfb6 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -19,7 +19,7 @@ use bytes::Bytes; use iggy_binary_protocol::consensus::iobuf::Owned; use iggy_binary_protocol::requests::streams::{CreateStreamRequest, DeleteStreamRequest}; use iggy_binary_protocol::{ - Message, Operation, RequestHeader, WireEncode, WireIdentifier, WireName, + AckLevel, Message, Operation, RequestHeader, WireEncode, WireIdentifier, WireName, }; use iggy_common::send_messages2::{ IggyMessage2, IggyMessage2Header, IggyMessages2, SendMessages2Owned, @@ -181,6 +181,44 @@ impl SimClient { self.build_request_with_namespace(Operation::DeleteConsumerOffset, &payload, namespace) } + /// v2 of `store_consumer_offset` carrying an explicit `AckLevel` byte. + /// + /// Only the simulator emits this opcode today; the partitions plane + /// accepts it alongside v1. The ack byte is reserved for future + /// cluster-side commit-timing semantics. + pub fn store_consumer_offset_v2( + &self, + namespace: IggyNamespace, + consumer_kind: u8, + consumer_id: u32, + offset: u64, + ack: AckLevel, + ) -> Message { + let mut payload = Vec::with_capacity(14); + payload.push(consumer_kind); + payload.extend_from_slice(&consumer_id.to_le_bytes()); + payload.extend_from_slice(&offset.to_le_bytes()); + payload.push(ack.as_u8()); + + self.build_request_with_namespace(Operation::StoreConsumerOffset2, &payload, namespace) + } + + /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte. + pub fn delete_consumer_offset_v2( + &self, + namespace: IggyNamespace, + consumer_kind: u8, + consumer_id: u32, + ack: AckLevel, + ) -> Message { + let mut payload = Vec::with_capacity(6); + payload.push(consumer_kind); + payload.extend_from_slice(&consumer_id.to_le_bytes()); + payload.push(ack.as_u8()); + + self.build_request_with_namespace(Operation::DeleteConsumerOffset2, &payload, namespace) + } + #[allow(clippy::cast_possible_truncation)] fn build_request_with_namespace( &self, From 7f2f66871923e078aae815a6510d46dfea491377 Mon Sep 17 00:00:00 2001 From: Krishna Vishal Date: Fri, 24 Apr 2026 02:08:08 +0530 Subject: [PATCH 3/4] fix: test failures --- core/consensus/src/client_table.rs | 2 +- .../tests/server/scenarios/authentication_scenario.rs | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/consensus/src/client_table.rs b/core/consensus/src/client_table.rs index 54a888034c..7ea5db56cb 100644 --- a/core/consensus/src/client_table.rs +++ b/core/consensus/src/client_table.rs @@ -430,8 +430,8 @@ impl ClientTable { if let Some(entry) = slot { let commit = entry.reply.header().commit; let should_evict = match evictee { - None => true, Some((_, min_commit)) => commit < min_commit, + None => true, }; if should_evict { evictee = Some((idx, commit)); diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs b/core/integration/tests/server/scenarios/authentication_scenario.rs index ab5245e3c1..24a0f2c449 100644 --- a/core/integration/tests/server/scenarios/authentication_scenario.rs +++ b/core/integration/tests/server/scenarios/authentication_scenario.rs @@ -139,6 +139,16 @@ async fn test_all_commands_require_auth(client: &IggyClient) { ) { continue; } + // v2 consumer-offset ops are registered in the dispatch table for the + // consensus/simulator pathway but are not wired into the legacy binary + // server's dispatch. They'll move into server-ng alongside the rest of + // the v2 surface; re-enable these codes here once that lands. + if matches!( + code, + STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE + ) { + continue; + } // ================================================================ // REQUIRES AUTH From b15a2ca1bd6b2b3682d5cfa8c34ad82a723e7037 Mon Sep 17 00:00:00 2001 From: Krishna Vishal Date: Tue, 5 May 2026 00:42:58 +0530 Subject: [PATCH 4/4] fix(shard): wire AckLevel into v2 consumer-offset dispatch --- .../delete_consumer_offset_2.rs | 5 +- .../store_consumer_offset_2.rs | 5 +- core/consensus/src/plane_helpers.rs | 58 ++++- core/partitions/src/iggy_partition.rs | 214 +++++++++++++++--- core/simulator/src/client.rs | 7 +- 5 files changed, 243 insertions(+), 46 deletions(-) diff --git a/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs index 315da10533..ccfa104ada 100644 --- a/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs +++ b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs @@ -24,9 +24,8 @@ use bytes::{BufMut, BytesMut}; /// `DeleteConsumerOffset` v2 request. /// -/// Extends v1 with an `ack` field that lets the client pick the commit -/// guarantee: `NoAck` for leader-local (fast) writes, `Quorum` for -/// consensus-committed writes. +/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR +/// pipeline. /// /// Wire format: /// ```text diff --git a/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs index 39c60ec52b..37e89fb1c5 100644 --- a/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs +++ b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs @@ -24,9 +24,8 @@ use bytes::{BufMut, BytesMut}; /// `StoreConsumerOffset` v2 request. /// -/// Extends v1 with an `ack` field that lets the client pick the commit -/// guarantee: `NoAck` for leader-local (fast) writes, `Quorum` for -/// consensus-committed writes. +/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR +/// pipeline. /// /// Wire format: /// ```text diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 4078d9eba9..e4da880847 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -21,7 +21,9 @@ use crate::{ Status, VsrConsensus, }; use iggy_binary_protocol::consensus::iobuf::Owned; -use iggy_binary_protocol::{Command2, Message, PrepareHeader, PrepareOkHeader, ReplyHeader}; +use iggy_binary_protocol::{ + Command2, Message, PrepareHeader, PrepareOkHeader, ReplyHeader, RequestHeader, +}; use message_bus::{MessageBus, SendError}; use std::ops::AsyncFnOnce; @@ -392,6 +394,60 @@ where .expect("reply buffer must contain a valid reply message") } +/// Reply for fast paths that skip the VSR pipeline (e.g. `AckLevel::NoAck`). +/// +/// Stamps `op` and `commit` with `commit_max` — monotonic, so +/// `ClientTable::commit_reply` regression checks always pass. +/// +/// # Panics +/// If the constructed message buffer is not valid. +#[allow(clippy::needless_pass_by_value, clippy::cast_possible_truncation)] +pub fn build_reply_from_request( + consensus: &VsrConsensus, + request_header: &RequestHeader, + body: bytes::Bytes, +) -> Message +where + B: MessageBus, + P: Pipeline, +{ + let header_size = std::mem::size_of::(); + let total_size = header_size + body.len(); + let mut buffer = bytes::BytesMut::zeroed(total_size); + + let commit = consensus.commit_max(); + let header = bytemuck::checked::try_from_bytes_mut::(&mut buffer[..header_size]) + .expect("zeroed bytes are valid"); + *header = ReplyHeader { + checksum: 0, + checksum_body: 0, + cluster: consensus.cluster(), + size: total_size as u32, + view: consensus.view(), + release: 0, + command: Command2::Reply, + replica: consensus.replica(), + reserved_frame: [0; 66], + request_checksum: request_header.request_checksum, + context: 0, + client: request_header.client, + op: commit, + commit, + timestamp: request_header.timestamp, + request: request_header.request, + operation: request_header.operation, + namespace: request_header.namespace, + ..Default::default() + }; + + if !body.is_empty() { + buffer[header_size..].copy_from_slice(&body); + } + + Message::try_from(Owned::<4096>::copy_from_slice(buffer.as_ref())) + .expect("reply buffer must contain a valid reply message") +} + /// Verify hash chain would not break if we add this header. /// /// # Panics diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index ac0312724b..e85717cfd8 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -31,13 +31,13 @@ use crate::{ use consensus::{ CommitLogEvent, Consensus, PartitionDiagEvent, Pipeline, PipelineEntry, PlaneKind, Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, drain_committable_prefix, + ack_quorum_reached, build_reply_from_request, build_reply_message, drain_committable_prefix, emit_namespace_progress_event, emit_partition_diag, emit_sim_event, fence_old_prepare_by_commit, replicate_preflight, replicate_to_next_in_chain, request_preflight, send_prepare_ok as send_prepare_ok_common, }; use iggy_binary_protocol::consensus::iobuf::Frozen; -use iggy_binary_protocol::{Message, Operation, PrepareHeader}; +use iggy_binary_protocol::{AckLevel, Message, Operation, PrepareHeader}; use iggy_binary_protocol::{PrepareOkHeader, RequestHeader}; use iggy_common::{ ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, @@ -83,6 +83,19 @@ where observed_view: u32, } +/// Post-preflight dispatch in `on_request`: replicate via VSR or take the +/// `NoAck` leader-local fast path. `RequestHeader` is boxed to avoid the +/// 277-byte inline variant tripping clippy's `large_enum_variant`. +enum Disposition { + Replicate(Message), + NoAck { + request_header: Box, + kind: ConsumerKind, + consumer_id: u32, + offset: Option, + }, +} + #[derive(Debug, Clone, Copy, PartialEq)] struct PendingConsumerOffsetCommit { kind: ConsumerKind, @@ -362,6 +375,74 @@ where Ok(()) } + /// `AckLevel::NoAck` fast path: persist, apply, cache + send reply, no + /// replication. Single-replica durability. + #[allow(clippy::future_not_send)] + async fn apply_consumer_offset_no_ack( + &self, + request_header: Box, + kind: ConsumerKind, + consumer_id: u32, + offset: Option, + ) { + let pending = offset.map_or_else( + || PendingConsumerOffsetCommit::delete(kind, consumer_id), + |value| PendingConsumerOffsetCommit::upsert(kind, consumer_id, value), + ); + + if let Err(error) = self.persist_consumer_offset_commit(pending).await { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset persist failed") + .with_operation(request_header.operation) + .with_error(error.to_string()), + ); + return; + } + if let Err(error) = self.apply_consumer_offset_commit(pending) { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset apply failed") + .with_operation(request_header.operation) + .with_error(error.to_string()), + ); + return; + } + + let reply = build_reply_from_request(&self.consensus, &request_header, bytes::Bytes::new()); + let session = self + .consensus + .client_table() + .borrow() + .get_session(request_header.client) + .unwrap_or_else(|| { + panic!( + "apply_consumer_offset_no_ack: client {} not registered", + request_header.client + ) + }); + self.consensus.client_table().borrow_mut().commit_reply( + request_header.client, + session, + reply.clone(), + ); + + let reply_buffers = reply.into_generic().into_frozen(); + if let Err(error) = self + .consensus + .message_bus() + .send_to_client(request_header.client, reply_buffers) + .await + { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new(self.diag_ctx(), "no_ack reply send failed") + .with_operation(request_header.operation) + .with_error(error.to_string()), + ); + } + } + fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) -> Option { match kind { ConsumerKind::Consumer => self @@ -607,7 +688,7 @@ where /// # Panics /// Panics if called when this partition's consensus instance is not the /// primary, is not in normal status, or is currently syncing. - #[allow(clippy::future_not_send)] + #[allow(clippy::future_not_send, clippy::too_many_lines)] pub async fn on_request(&mut self, message: Message) { self.clear_pending_consumer_offset_commits_if_view_changed(); let namespace = IggyNamespace::from_raw(message.header().namespace); @@ -635,7 +716,7 @@ where } } - let prepare = { + let disposition = { let consensus = self.consensus(); emit_sim_event( SimEventKind::ClientRequestReceived, @@ -667,28 +748,51 @@ where message }; + // Parse once for both the delete-existence check and AckLevel dispatch. + let consumer_offset = match message.header().operation { + Operation::StoreConsumerOffset + | Operation::StoreConsumerOffset2 + | Operation::DeleteConsumerOffset + | Operation::DeleteConsumerOffset2 => { + match Self::parse_consumer_offset_request(message.header().operation, &message) + { + Ok(parsed) => Some(parsed), + Err(error) => { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus( + consensus, + PlaneKind::Partitions, + ), + "failed to parse consumer offset request", + ) + .with_operation(message.header().operation) + .with_error(error.to_string()), + ); + return; + } + } + } + _ => None, + }; + if matches!( message.header().operation, Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 - ) { - match Self::parse_consumer_offset_request(message.header().operation, &message) - .and_then(|(kind, consumer_id, _)| { - self.ensure_consumer_offset_exists(kind, consumer_id) - }) { - Ok(()) => {} - Err(error) => { - emit_partition_diag( - tracing::Level::WARN, - &PartitionDiagEvent::new( - ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), - "rejecting delete_consumer_offset for missing offset", - ) - .with_operation(message.header().operation) - .with_error(error.to_string()), - ); - return; - } - } + ) && let Some((kind, consumer_id, _, _)) = consumer_offset + && let Err(error) = self.ensure_consumer_offset_exists(kind, consumer_id) + { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "rejecting delete_consumer_offset for missing offset", + ) + .with_operation(message.header().operation) + .with_error(error.to_string()), + ); + return; } if request_preflight(consensus, client_id, session, request) @@ -702,12 +806,39 @@ where assert!(consensus.is_normal(), "on_request: status must be normal"); assert!(!consensus.is_syncing(), "on_request: must not be syncing"); - let prepare = message.project(consensus); - consensus.verify_pipeline(); - consensus.pipeline_message(PlaneKind::Partitions, &prepare); - prepare + // NoAck v2 -> fast path. Quorum + v1 -> VSR pipeline. + if let Some((kind, consumer_id, offset, AckLevel::NoAck)) = consumer_offset + && matches!( + message.header().operation, + Operation::StoreConsumerOffset2 | Operation::DeleteConsumerOffset2, + ) + { + Disposition::NoAck { + request_header: Box::new(*message.header()), + kind, + consumer_id, + offset, + } + } else { + let prepare = message.project(consensus); + consensus.verify_pipeline(); + consensus.pipeline_message(PlaneKind::Partitions, &prepare); + Disposition::Replicate(prepare) + } }; - self.on_replicate(prepare).await; + + match disposition { + Disposition::Replicate(prepare) => self.on_replicate(prepare).await, + Disposition::NoAck { + request_header, + kind, + consumer_id, + offset, + } => { + self.apply_consumer_offset_no_ack(request_header, kind, consumer_id, offset) + .await; + } + } } #[allow(clippy::future_not_send, clippy::too_many_lines)] @@ -979,7 +1110,8 @@ where | Operation::DeleteConsumerOffset | Operation::StoreConsumerOffset2 | Operation::DeleteConsumerOffset2 => { - let (kind, consumer_id, offset) = + // Replicated path is Quorum-only by construction; ack ignored. + let (kind, consumer_id, offset, _ack) = Self::parse_staged_consumer_offset_commit(header.operation, &message)?; let write_lock = self.write_lock.clone(); let _guard = write_lock.lock().await; @@ -1381,7 +1513,7 @@ where fn parse_consumer_offset_request( operation: Operation, message: &Message, - ) -> Result<(ConsumerKind, u32, Option), IggyError> { + ) -> Result<(ConsumerKind, u32, Option, AckLevel), IggyError> { let total_size = usize::try_from(message.header().size).map_err(|_| IggyError::InvalidCommand)?; let body = message @@ -1394,7 +1526,7 @@ where fn parse_staged_consumer_offset_commit( operation: Operation, message: &Message, - ) -> Result<(ConsumerKind, u32, Option), IggyError> { + ) -> Result<(ConsumerKind, u32, Option, AckLevel), IggyError> { let total_size = usize::try_from(message.header().size).map_err(|_| IggyError::InvalidCommand)?; let body = message @@ -1407,7 +1539,7 @@ where fn parse_consumer_offset_payload( operation: Operation, body: &[u8], - ) -> Result<(ConsumerKind, u32, Option), IggyError> { + ) -> Result<(ConsumerKind, u32, Option, AckLevel), IggyError> { let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?; let consumer_id = body .get(1..5) @@ -1418,6 +1550,8 @@ where .map_err(|_| IggyError::InvalidCommand) })?; let kind = ConsumerKind::from_code(consumer_kind)?; + // v1 implicitly Quorum. v2 trailing ack byte validated; unknown + // discriminants rejected so malformed wire bytes fail fast. match operation { Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 => { let offset = @@ -1428,10 +1562,22 @@ where .map(u64::from_le_bytes) .map_err(|_| IggyError::InvalidCommand) })?; - Ok((kind, consumer_id, Some(offset))) + let ack = if matches!(operation, Operation::StoreConsumerOffset2) { + let ack_byte = *body.get(13).ok_or(IggyError::InvalidCommand)?; + AckLevel::from_code(ack_byte).map_err(|_| IggyError::InvalidCommand)? + } else { + AckLevel::Quorum + }; + Ok((kind, consumer_id, Some(offset), ack)) } Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 => { - Ok((kind, consumer_id, None)) + let ack = if matches!(operation, Operation::DeleteConsumerOffset2) { + let ack_byte = *body.get(5).ok_or(IggyError::InvalidCommand)?; + AckLevel::from_code(ack_byte).map_err(|_| IggyError::InvalidCommand)? + } else { + AckLevel::Quorum + }; + Ok((kind, consumer_id, None, ack)) } _ => Err(IggyError::InvalidCommand), } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 88940fbfb6..0c2c5b5889 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -181,11 +181,8 @@ impl SimClient { self.build_request_with_namespace(Operation::DeleteConsumerOffset, &payload, namespace) } - /// v2 of `store_consumer_offset` carrying an explicit `AckLevel` byte. - /// - /// Only the simulator emits this opcode today; the partitions plane - /// accepts it alongside v1. The ack byte is reserved for future - /// cluster-side commit-timing semantics. + /// v2 of `store_consumer_offset` with an `AckLevel` byte. `NoAck` takes + /// the primary's fast path (no replication); `Quorum` goes through VSR. pub fn store_consumer_offset_v2( &self, namespace: IggyNamespace,