diff --git a/core/binary_protocol/src/codes.rs b/core/binary_protocol/src/codes.rs index f663c85d0d..ae33a98142 100644 --- a/core/binary_protocol/src/codes.rs +++ b/core/binary_protocol/src/codes.rs @@ -57,6 +57,8 @@ pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102; pub const GET_CONSUMER_OFFSET_CODE: u32 = 120; pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121; pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122; +pub const STORE_CONSUMER_OFFSET_2_CODE: u32 = 123; +pub const DELETE_CONSUMER_OFFSET_2_CODE: u32 = 124; // -- Streams -- pub const GET_STREAM_CODE: u32 = 200; @@ -133,6 +135,8 @@ mod tests { GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE, DELETE_CONSUMER_OFFSET_CODE, + STORE_CONSUMER_OFFSET_2_CODE, + DELETE_CONSUMER_OFFSET_2_CODE, GET_STREAM_CODE, GET_STREAMS_CODE, CREATE_STREAM_CODE, diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs index f0a08a29d5..64a43a3cad 100644 --- a/core/binary_protocol/src/consensus/operation.rs +++ b/core/binary_protocol/src/consensus/operation.rs @@ -65,6 +65,9 @@ pub enum Operation { SendMessages = 160, StoreConsumerOffset = 161, DeleteConsumerOffset = 162, + // 163 is reserved for the planned DeleteSegments move (see TODO above). + StoreConsumerOffset2 = 164, + DeleteConsumerOffset2 = 165, } impl Operation { @@ -165,7 +168,9 @@ impl Operation { | Self::DeletePersonalAccessToken | Self::SendMessages | Self::StoreConsumerOffset - | Self::DeleteConsumerOffset => match crate::dispatch::lookup_by_operation(*self) { + | Self::DeleteConsumerOffset + | Self::StoreConsumerOffset2 + | Self::DeleteConsumerOffset2 => match crate::dispatch::lookup_by_operation(*self) { Some(meta) => Some(meta.code), None => None, }, @@ -214,6 +219,8 @@ mod tests { Operation::SendMessages, Operation::StoreConsumerOffset, Operation::DeleteConsumerOffset, + Operation::StoreConsumerOffset2, + Operation::DeleteConsumerOffset2, ]; for op in ops { let code = op @@ -270,5 +277,7 @@ mod tests { assert!(!Operation::SendMessages.is_metadata()); assert!(Operation::DeleteSegments.is_partition()); assert!(Operation::DeleteConsumerOffset.is_partition()); + assert!(Operation::StoreConsumerOffset2.is_partition()); + assert!(Operation::DeleteConsumerOffset2.is_partition()); } } diff --git a/core/binary_protocol/src/dispatch.rs b/core/binary_protocol/src/dispatch.rs index fb5e33683b..bb83280bed 100644 --- a/core/binary_protocol/src/dispatch.rs +++ b/core/binary_protocol/src/dispatch.rs @@ -126,6 +126,16 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[ "consumer_offset.delete", Operation::DeleteConsumerOffset, ), + CommandMeta::replicated( + STORE_CONSUMER_OFFSET_2_CODE, + "consumer_offset.store.v2", + Operation::StoreConsumerOffset2, + ), + CommandMeta::replicated( + DELETE_CONSUMER_OFFSET_2_CODE, + "consumer_offset.delete.v2", + Operation::DeleteConsumerOffset2, + ), // Streams CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"), CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"), @@ -212,28 +222,30 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> { GET_CONSUMER_OFFSET_CODE => 24, STORE_CONSUMER_OFFSET_CODE => 25, DELETE_CONSUMER_OFFSET_CODE => 26, - GET_STREAM_CODE => 27, - GET_STREAMS_CODE => 28, - CREATE_STREAM_CODE => 29, - DELETE_STREAM_CODE => 30, - UPDATE_STREAM_CODE => 31, - PURGE_STREAM_CODE => 32, - GET_TOPIC_CODE => 33, - GET_TOPICS_CODE => 34, - CREATE_TOPIC_CODE => 35, - DELETE_TOPIC_CODE => 36, - UPDATE_TOPIC_CODE => 37, - PURGE_TOPIC_CODE => 38, - CREATE_PARTITIONS_CODE => 39, - DELETE_PARTITIONS_CODE => 40, - DELETE_SEGMENTS_CODE => 41, - GET_CONSUMER_GROUP_CODE => 42, - GET_CONSUMER_GROUPS_CODE => 43, - CREATE_CONSUMER_GROUP_CODE => 44, - DELETE_CONSUMER_GROUP_CODE => 45, - JOIN_CONSUMER_GROUP_CODE => 46, - LEAVE_CONSUMER_GROUP_CODE => 47, - LOGIN_REGISTER_WITH_PAT_CODE => 48, + STORE_CONSUMER_OFFSET_2_CODE => 27, + DELETE_CONSUMER_OFFSET_2_CODE => 28, + GET_STREAM_CODE => 29, + GET_STREAMS_CODE => 30, + CREATE_STREAM_CODE => 31, + DELETE_STREAM_CODE => 32, + UPDATE_STREAM_CODE => 33, + PURGE_STREAM_CODE => 34, + GET_TOPIC_CODE => 35, + GET_TOPICS_CODE => 36, + CREATE_TOPIC_CODE => 37, + DELETE_TOPIC_CODE => 38, + UPDATE_TOPIC_CODE => 39, + PURGE_TOPIC_CODE => 40, + CREATE_PARTITIONS_CODE => 41, + DELETE_PARTITIONS_CODE => 42, + DELETE_SEGMENTS_CODE => 43, + GET_CONSUMER_GROUP_CODE => 44, + GET_CONSUMER_GROUPS_CODE => 45, + CREATE_CONSUMER_GROUP_CODE => 46, + DELETE_CONSUMER_GROUP_CODE => 47, + JOIN_CONSUMER_GROUP_CODE => 48, + LEAVE_CONSUMER_GROUP_CODE => 49, + LOGIN_REGISTER_WITH_PAT_CODE => 50, _ => return None, }; Some(&COMMAND_TABLE[idx]) @@ -247,19 +259,19 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> { pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> { // Indices must match the order of entries in COMMAND_TABLE above. let idx = match op { - Operation::CreateStream => 29, - Operation::UpdateStream => 31, - Operation::DeleteStream => 30, - Operation::PurgeStream => 32, - Operation::CreateTopic => 35, - Operation::UpdateTopic => 37, - Operation::DeleteTopic => 36, - Operation::PurgeTopic => 38, - Operation::CreatePartitions => 39, - Operation::DeletePartitions => 40, - Operation::DeleteSegments => 41, - Operation::CreateConsumerGroup => 44, - Operation::DeleteConsumerGroup => 45, + Operation::CreateStream => 31, + Operation::UpdateStream => 33, + Operation::DeleteStream => 32, + Operation::PurgeStream => 34, + Operation::CreateTopic => 37, + Operation::UpdateTopic => 39, + Operation::DeleteTopic => 38, + Operation::PurgeTopic => 40, + Operation::CreatePartitions => 41, + Operation::DeletePartitions => 42, + Operation::DeleteSegments => 43, + Operation::CreateConsumerGroup => 46, + Operation::DeleteConsumerGroup => 47, Operation::CreateUser => 9, Operation::UpdateUser => 11, Operation::DeleteUser => 10, @@ -270,6 +282,8 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> Operation::SendMessages => 22, Operation::StoreConsumerOffset => 25, Operation::DeleteConsumerOffset => 26, + Operation::StoreConsumerOffset2 => 27, + Operation::DeleteConsumerOffset2 => 28, Operation::CreateTopicWithAssignments | Operation::CreatePartitionsWithAssignments | Operation::Reserved @@ -313,6 +327,8 @@ mod tests { GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE, DELETE_CONSUMER_OFFSET_CODE, + STORE_CONSUMER_OFFSET_2_CODE, + DELETE_CONSUMER_OFFSET_2_CODE, GET_STREAM_CODE, GET_STREAMS_CODE, CREATE_STREAM_CODE, @@ -394,6 +410,8 @@ mod tests { Operation::SendMessages, Operation::StoreConsumerOffset, Operation::DeleteConsumerOffset, + Operation::StoreConsumerOffset2, + Operation::DeleteConsumerOffset2, ]; for op in replicated_ops { let meta = lookup_by_operation(op) diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs index 8d4544c2bb..3ef0a85130 100644 --- a/core/binary_protocol/src/lib.rs +++ b/core/binary_protocol/src/lib.rs @@ -80,6 +80,7 @@ pub use framing::{RequestFrame, RequestFrame2, ResponseFrame, ResponseFrame2, ST pub use message_view::{ WireMessageIterator, WireMessageIteratorMut, WireMessageView, WireMessageViewMut, }; +pub use primitives::ack_level::AckLevel; pub use primitives::consumer::WireConsumer; pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName}; pub use primitives::partition_assignment::CreatedPartitionAssignment; diff --git a/core/binary_protocol/src/primitives/ack_level.rs b/core/binary_protocol/src/primitives/ack_level.rs new file mode 100644 index 0000000000..a2ddb2389c --- /dev/null +++ b/core/binary_protocol/src/primitives/ack_level.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; + +/// Acknowledgement policy for consumer-offset write commands. +/// +/// Wire format: single `u8` discriminant. +/// - `NoAck(0)`: leader-local write only; respond as soon as the in-memory +/// and on-disk state have been updated. Matches the fast path used by +/// `PollMessages` auto-commit. +/// - `Quorum(1)`: submit through the partition VSR consensus pipeline and +/// respond only after the write has been committed by a quorum of replicas. +/// This is the default for explicit client writes. +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum AckLevel { + NoAck = 0, + #[default] + Quorum = 1, +} + +impl AckLevel { + /// Decode an `AckLevel` from its wire discriminant. + /// + /// # Errors + /// Returns `WireError::UnknownDiscriminant` for unrecognised values. + pub const fn from_code(code: u8) -> Result { + match code { + 0 => Ok(Self::NoAck), + 1 => Ok(Self::Quorum), + other => Err(WireError::UnknownDiscriminant { + type_name: "AckLevel", + value: other, + offset: 0, + }), + } + } + + /// Encode this `AckLevel` as its wire discriminant. + #[must_use] + pub const fn as_u8(self) -> u8 { + self as u8 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_no_ack() { + let level = AckLevel::NoAck; + let decoded = AckLevel::from_code(level.as_u8()).unwrap(); + assert_eq!(decoded, level); + } + + #[test] + fn roundtrip_quorum() { + let level = AckLevel::Quorum; + let decoded = AckLevel::from_code(level.as_u8()).unwrap(); + assert_eq!(decoded, level); + } + + #[test] + fn default_is_quorum() { + assert_eq!(AckLevel::default(), AckLevel::Quorum); + } + + #[test] + fn discriminant_values() { + assert_eq!(AckLevel::NoAck.as_u8(), 0); + assert_eq!(AckLevel::Quorum.as_u8(), 1); + } + + #[test] + fn unknown_discriminant_rejected() { + for code in 2u8..=u8::MAX { + let err = AckLevel::from_code(code).unwrap_err(); + match err { + WireError::UnknownDiscriminant { + type_name, + value, + offset, + } => { + assert_eq!(type_name, "AckLevel"); + assert_eq!(value, code); + assert_eq!(offset, 0); + } + other => panic!("unexpected error variant: {other:?}"), + } + } + } +} diff --git a/core/binary_protocol/src/primitives/mod.rs b/core/binary_protocol/src/primitives/mod.rs index 064f0fcfbb..f3c7ee4e32 100644 --- a/core/binary_protocol/src/primitives/mod.rs +++ b/core/binary_protocol/src/primitives/mod.rs @@ -17,6 +17,7 @@ //! Shared wire primitives reused across request and response types. +pub mod ack_level; pub mod consumer; pub mod identifier; pub mod partition_assignment; diff --git a/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs new file mode 100644 index 0000000000..ccfa104ada --- /dev/null +++ b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le}; +use crate::primitives::ack_level::AckLevel; +use crate::primitives::consumer::WireConsumer; +use bytes::{BufMut, BytesMut}; + +/// `DeleteConsumerOffset` v2 request. +/// +/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR +/// pipeline. +/// +/// Wire format: +/// ```text +/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4 LE][ack:1] +/// ``` +/// +/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes +/// for the u32 value (0 when None). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DeleteConsumerOffset2Request { + pub consumer: WireConsumer, + pub stream_id: WireIdentifier, + pub topic_id: WireIdentifier, + pub partition_id: Option, + pub ack: AckLevel, +} + +impl WireEncode for DeleteConsumerOffset2Request { + fn encoded_size(&self) -> usize { + self.consumer.encoded_size() + + self.stream_id.encoded_size() + + self.topic_id.encoded_size() + + 1 + + 4 + + 1 + } + + fn encode(&self, buf: &mut BytesMut) { + self.consumer.encode(buf); + self.stream_id.encode(buf); + self.topic_id.encode(buf); + if let Some(pid) = self.partition_id { + buf.put_u8(1); + buf.put_u32_le(pid); + } else { + buf.put_u8(0); + buf.put_u32_le(0); + } + buf.put_u8(self.ack.as_u8()); + } +} + +impl WireDecode for DeleteConsumerOffset2Request { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let mut pos = 0; + let (consumer, n) = WireConsumer::decode(&buf[pos..])?; + pos += n; + let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let partition_flag = read_u8(buf, pos)?; + pos += 1; + let partition_raw = read_u32_le(buf, pos)?; + pos += 4; + let partition_id = if partition_flag == 1 { + Some(partition_raw) + } else { + None + }; + let ack_code = read_u8(buf, pos)?; + pos += 1; + let ack = AckLevel::from_code(ack_code)?; + Ok(( + Self { + consumer, + stream_id, + topic_id, + partition_id, + ack, + }, + pos, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_with_partition_quorum() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(10), + topic_id: WireIdentifier::numeric(20), + partition_id: Some(5), + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = DeleteConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_without_partition_no_ack() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: None, + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = DeleteConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_named_identifiers() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()), + stream_id: WireIdentifier::named("stream-1").unwrap(), + topic_id: WireIdentifier::named("topic-1").unwrap(), + partition_id: Some(0), + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = DeleteConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn ack_byte_is_last() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8()); + } + + #[test] + fn unknown_ack_rejected() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + ack: AckLevel::Quorum, + }; + let mut bytes = req.to_bytes().to_vec(); + let last = bytes.len() - 1; + bytes[last] = 0xFF; + assert!(DeleteConsumerOffset2Request::decode(&bytes).is_err()); + } + + #[test] + fn truncated_returns_error() { + let req = DeleteConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(1), + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + for i in 0..bytes.len() { + assert!( + DeleteConsumerOffset2Request::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } +} diff --git a/core/binary_protocol/src/requests/consumer_offsets/mod.rs b/core/binary_protocol/src/requests/consumer_offsets/mod.rs index bde4d02b58..f8d3cee598 100644 --- a/core/binary_protocol/src/requests/consumer_offsets/mod.rs +++ b/core/binary_protocol/src/requests/consumer_offsets/mod.rs @@ -16,9 +16,13 @@ // under the License. pub mod delete_consumer_offset; +pub mod delete_consumer_offset_2; pub mod get_consumer_offset; pub mod store_consumer_offset; +pub mod store_consumer_offset_2; pub use delete_consumer_offset::DeleteConsumerOffsetRequest; +pub use delete_consumer_offset_2::DeleteConsumerOffset2Request; pub use get_consumer_offset::GetConsumerOffsetRequest; pub use store_consumer_offset::StoreConsumerOffsetRequest; +pub use store_consumer_offset_2::StoreConsumerOffset2Request; diff --git a/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs new file mode 100644 index 0000000000..37e89fb1c5 --- /dev/null +++ b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::WireIdentifier; +use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le}; +use crate::primitives::ack_level::AckLevel; +use crate::primitives::consumer::WireConsumer; +use bytes::{BufMut, BytesMut}; + +/// `StoreConsumerOffset` v2 request. +/// +/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR +/// pipeline. +/// +/// Wire format: +/// ```text +/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4 LE][offset:8 LE][ack:1] +/// ``` +/// +/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes +/// for the u32 value (0 when None). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StoreConsumerOffset2Request { + pub consumer: WireConsumer, + pub stream_id: WireIdentifier, + pub topic_id: WireIdentifier, + pub partition_id: Option, + pub offset: u64, + pub ack: AckLevel, +} + +impl WireEncode for StoreConsumerOffset2Request { + fn encoded_size(&self) -> usize { + self.consumer.encoded_size() + + self.stream_id.encoded_size() + + self.topic_id.encoded_size() + + 1 + + 4 + + 8 + + 1 + } + + fn encode(&self, buf: &mut BytesMut) { + self.consumer.encode(buf); + self.stream_id.encode(buf); + self.topic_id.encode(buf); + if let Some(pid) = self.partition_id { + buf.put_u8(1); + buf.put_u32_le(pid); + } else { + buf.put_u8(0); + buf.put_u32_le(0); + } + buf.put_u64_le(self.offset); + buf.put_u8(self.ack.as_u8()); + } +} + +impl WireDecode for StoreConsumerOffset2Request { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let mut pos = 0; + let (consumer, n) = WireConsumer::decode(&buf[pos..])?; + pos += n; + let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?; + pos += n; + let partition_flag = read_u8(buf, pos)?; + pos += 1; + let partition_raw = read_u32_le(buf, pos)?; + pos += 4; + let partition_id = if partition_flag == 1 { + Some(partition_raw) + } else { + None + }; + let offset = read_u64_le(buf, pos)?; + pos += 8; + let ack_code = read_u8(buf, pos)?; + pos += 1; + let ack = AckLevel::from_code(ack_code)?; + Ok(( + Self { + consumer, + stream_id, + topic_id, + partition_id, + offset, + ack, + }, + pos, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn roundtrip_with_partition_quorum() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(10), + topic_id: WireIdentifier::numeric(20), + partition_id: Some(5), + offset: 12345, + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = StoreConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_without_partition_no_ack() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: None, + offset: u64::MAX, + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = StoreConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn roundtrip_named_identifiers() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()), + stream_id: WireIdentifier::named("stream-1").unwrap(), + topic_id: WireIdentifier::named("topic-1").unwrap(), + partition_id: Some(0), + offset: 0, + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + let (decoded, consumed) = StoreConsumerOffset2Request::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, req); + } + + #[test] + fn ack_byte_is_last() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + offset: 0, + ack: AckLevel::NoAck, + }; + let bytes = req.to_bytes(); + assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8()); + } + + #[test] + fn unknown_ack_rejected() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(0), + offset: 0, + ack: AckLevel::Quorum, + }; + let mut bytes = req.to_bytes().to_vec(); + let last = bytes.len() - 1; + bytes[last] = 0xFF; + assert!(StoreConsumerOffset2Request::decode(&bytes).is_err()); + } + + #[test] + fn truncated_returns_error() { + let req = StoreConsumerOffset2Request { + consumer: WireConsumer::consumer(WireIdentifier::numeric(1)), + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(1), + partition_id: Some(1), + offset: 100, + ack: AckLevel::Quorum, + }; + let bytes = req.to_bytes(); + for i in 0..bytes.len() { + assert!( + StoreConsumerOffset2Request::decode(&bytes[..i]).is_err(), + "expected error for truncation at byte {i}" + ); + } + } +} diff --git a/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs new file mode 100644 index 0000000000..a145aab189 --- /dev/null +++ b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `DeleteConsumerOffset2` response is empty. +pub type DeleteConsumerOffset2Response = super::EmptyResponse; diff --git a/core/binary_protocol/src/responses/consumer_offsets/mod.rs b/core/binary_protocol/src/responses/consumer_offsets/mod.rs index 915648c18c..6dbaf454de 100644 --- a/core/binary_protocol/src/responses/consumer_offsets/mod.rs +++ b/core/binary_protocol/src/responses/consumer_offsets/mod.rs @@ -16,10 +16,14 @@ // under the License. mod delete_consumer_offset; +mod delete_consumer_offset_2; pub mod get_consumer_offset; mod store_consumer_offset; +mod store_consumer_offset_2; pub use super::EmptyResponse; pub use delete_consumer_offset::DeleteConsumerOffsetResponse; +pub use delete_consumer_offset_2::DeleteConsumerOffset2Response; pub use get_consumer_offset::ConsumerOffsetResponse; pub use store_consumer_offset::StoreConsumerOffsetResponse; +pub use store_consumer_offset_2::StoreConsumerOffset2Response; diff --git a/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs new file mode 100644 index 0000000000..0f76f603b8 --- /dev/null +++ b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// `StoreConsumerOffset2` response is empty. +pub type StoreConsumerOffset2Response = super::EmptyResponse; diff --git a/core/consensus/src/client_table.rs b/core/consensus/src/client_table.rs index 54a888034c..7ea5db56cb 100644 --- a/core/consensus/src/client_table.rs +++ b/core/consensus/src/client_table.rs @@ -430,8 +430,8 @@ impl ClientTable { if let Some(entry) = slot { let commit = entry.reply.header().commit; let should_evict = match evictee { - None => true, Some((_, min_commit)) => commit < min_commit, + None => true, }; if should_evict { evictee = Some((idx, commit)); diff --git a/core/consensus/src/observability.rs b/core/consensus/src/observability.rs index bb042af108..2ac1d00447 100644 --- a/core/consensus/src/observability.rs +++ b/core/consensus/src/observability.rs @@ -644,6 +644,8 @@ pub const fn operation_as_str(operation: Operation) -> &'static str { Operation::SendMessages => "send_messages", Operation::StoreConsumerOffset => "store_consumer_offset", Operation::DeleteConsumerOffset => "delete_consumer_offset", + Operation::StoreConsumerOffset2 => "store_consumer_offset_2", + Operation::DeleteConsumerOffset2 => "delete_consumer_offset_2", } } diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 4078d9eba9..e4da880847 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -21,7 +21,9 @@ use crate::{ Status, VsrConsensus, }; use iggy_binary_protocol::consensus::iobuf::Owned; -use iggy_binary_protocol::{Command2, Message, PrepareHeader, PrepareOkHeader, ReplyHeader}; +use iggy_binary_protocol::{ + Command2, Message, PrepareHeader, PrepareOkHeader, ReplyHeader, RequestHeader, +}; use message_bus::{MessageBus, SendError}; use std::ops::AsyncFnOnce; @@ -392,6 +394,60 @@ where .expect("reply buffer must contain a valid reply message") } +/// Reply for fast paths that skip the VSR pipeline (e.g. `AckLevel::NoAck`). +/// +/// Stamps `op` and `commit` with `commit_max` — monotonic, so +/// `ClientTable::commit_reply` regression checks always pass. +/// +/// # Panics +/// If the constructed message buffer is not valid. +#[allow(clippy::needless_pass_by_value, clippy::cast_possible_truncation)] +pub fn build_reply_from_request( + consensus: &VsrConsensus, + request_header: &RequestHeader, + body: bytes::Bytes, +) -> Message +where + B: MessageBus, + P: Pipeline, +{ + let header_size = std::mem::size_of::(); + let total_size = header_size + body.len(); + let mut buffer = bytes::BytesMut::zeroed(total_size); + + let commit = consensus.commit_max(); + let header = bytemuck::checked::try_from_bytes_mut::(&mut buffer[..header_size]) + .expect("zeroed bytes are valid"); + *header = ReplyHeader { + checksum: 0, + checksum_body: 0, + cluster: consensus.cluster(), + size: total_size as u32, + view: consensus.view(), + release: 0, + command: Command2::Reply, + replica: consensus.replica(), + reserved_frame: [0; 66], + request_checksum: request_header.request_checksum, + context: 0, + client: request_header.client, + op: commit, + commit, + timestamp: request_header.timestamp, + request: request_header.request, + operation: request_header.operation, + namespace: request_header.namespace, + ..Default::default() + }; + + if !body.is_empty() { + buffer[header_size..].copy_from_slice(&body); + } + + Message::try_from(Owned::<4096>::copy_from_slice(buffer.as_ref())) + .expect("reply buffer must contain a valid reply message") +} + /// Verify hash chain would not break if we add this header. /// /// # Panics diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs b/core/integration/tests/server/scenarios/authentication_scenario.rs index ab5245e3c1..24a0f2c449 100644 --- a/core/integration/tests/server/scenarios/authentication_scenario.rs +++ b/core/integration/tests/server/scenarios/authentication_scenario.rs @@ -139,6 +139,16 @@ async fn test_all_commands_require_auth(client: &IggyClient) { ) { continue; } + // v2 consumer-offset ops are registered in the dispatch table for the + // consensus/simulator pathway but are not wired into the legacy binary + // server's dispatch. They'll move into server-ng alongside the rest of + // the v2 surface; re-enable these codes here once that lands. + if matches!( + code, + STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE + ) { + continue; + } // ================================================================ // REQUIRES AUTH diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 37e99679e7..e85717cfd8 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -31,13 +31,13 @@ use crate::{ use consensus::{ CommitLogEvent, Consensus, PartitionDiagEvent, Pipeline, PipelineEntry, PlaneKind, Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, drain_committable_prefix, + ack_quorum_reached, build_reply_from_request, build_reply_message, drain_committable_prefix, emit_namespace_progress_event, emit_partition_diag, emit_sim_event, fence_old_prepare_by_commit, replicate_preflight, replicate_to_next_in_chain, request_preflight, send_prepare_ok as send_prepare_ok_common, }; use iggy_binary_protocol::consensus::iobuf::Frozen; -use iggy_binary_protocol::{Message, Operation, PrepareHeader}; +use iggy_binary_protocol::{AckLevel, Message, Operation, PrepareHeader}; use iggy_binary_protocol::{PrepareOkHeader, RequestHeader}; use iggy_common::{ ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, ConsumerOffsets, @@ -83,6 +83,19 @@ where observed_view: u32, } +/// Post-preflight dispatch in `on_request`: replicate via VSR or take the +/// `NoAck` leader-local fast path. `RequestHeader` is boxed to avoid the +/// 277-byte inline variant tripping clippy's `large_enum_variant`. +enum Disposition { + Replicate(Message), + NoAck { + request_header: Box, + kind: ConsumerKind, + consumer_id: u32, + offset: Option, + }, +} + #[derive(Debug, Clone, Copy, PartialEq)] struct PendingConsumerOffsetCommit { kind: ConsumerKind, @@ -362,6 +375,74 @@ where Ok(()) } + /// `AckLevel::NoAck` fast path: persist, apply, cache + send reply, no + /// replication. Single-replica durability. + #[allow(clippy::future_not_send)] + async fn apply_consumer_offset_no_ack( + &self, + request_header: Box, + kind: ConsumerKind, + consumer_id: u32, + offset: Option, + ) { + let pending = offset.map_or_else( + || PendingConsumerOffsetCommit::delete(kind, consumer_id), + |value| PendingConsumerOffsetCommit::upsert(kind, consumer_id, value), + ); + + if let Err(error) = self.persist_consumer_offset_commit(pending).await { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset persist failed") + .with_operation(request_header.operation) + .with_error(error.to_string()), + ); + return; + } + if let Err(error) = self.apply_consumer_offset_commit(pending) { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset apply failed") + .with_operation(request_header.operation) + .with_error(error.to_string()), + ); + return; + } + + let reply = build_reply_from_request(&self.consensus, &request_header, bytes::Bytes::new()); + let session = self + .consensus + .client_table() + .borrow() + .get_session(request_header.client) + .unwrap_or_else(|| { + panic!( + "apply_consumer_offset_no_ack: client {} not registered", + request_header.client + ) + }); + self.consensus.client_table().borrow_mut().commit_reply( + request_header.client, + session, + reply.clone(), + ); + + let reply_buffers = reply.into_generic().into_frozen(); + if let Err(error) = self + .consensus + .message_bus() + .send_to_client(request_header.client, reply_buffers) + .await + { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new(self.diag_ctx(), "no_ack reply send failed") + .with_operation(request_header.operation) + .with_error(error.to_string()), + ); + } + } + fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) -> Option { match kind { ConsumerKind::Consumer => self @@ -607,7 +688,7 @@ where /// # Panics /// Panics if called when this partition's consensus instance is not the /// primary, is not in normal status, or is currently syncing. - #[allow(clippy::future_not_send)] + #[allow(clippy::future_not_send, clippy::too_many_lines)] pub async fn on_request(&mut self, message: Message) { self.clear_pending_consumer_offset_commits_if_view_changed(); let namespace = IggyNamespace::from_raw(message.header().namespace); @@ -635,7 +716,7 @@ where } } - let prepare = { + let disposition = { let consensus = self.consensus(); emit_sim_event( SimEventKind::ClientRequestReceived, @@ -667,25 +748,51 @@ where message }; - if message.header().operation == Operation::DeleteConsumerOffset { - match Self::parse_consumer_offset_request(message.header().operation, &message) - .and_then(|(kind, consumer_id, _)| { - self.ensure_consumer_offset_exists(kind, consumer_id) - }) { - Ok(()) => {} - Err(error) => { - emit_partition_diag( - tracing::Level::WARN, - &PartitionDiagEvent::new( - ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), - "rejecting delete_consumer_offset for missing offset", - ) - .with_operation(Operation::DeleteConsumerOffset) - .with_error(error.to_string()), - ); - return; + // Parse once for both the delete-existence check and AckLevel dispatch. + let consumer_offset = match message.header().operation { + Operation::StoreConsumerOffset + | Operation::StoreConsumerOffset2 + | Operation::DeleteConsumerOffset + | Operation::DeleteConsumerOffset2 => { + match Self::parse_consumer_offset_request(message.header().operation, &message) + { + Ok(parsed) => Some(parsed), + Err(error) => { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus( + consensus, + PlaneKind::Partitions, + ), + "failed to parse consumer offset request", + ) + .with_operation(message.header().operation) + .with_error(error.to_string()), + ); + return; + } } } + _ => None, + }; + + if matches!( + message.header().operation, + Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 + ) && let Some((kind, consumer_id, _, _)) = consumer_offset + && let Err(error) = self.ensure_consumer_offset_exists(kind, consumer_id) + { + emit_partition_diag( + tracing::Level::WARN, + &PartitionDiagEvent::new( + ReplicaLogContext::from_consensus(consensus, PlaneKind::Partitions), + "rejecting delete_consumer_offset for missing offset", + ) + .with_operation(message.header().operation) + .with_error(error.to_string()), + ); + return; } if request_preflight(consensus, client_id, session, request) @@ -699,12 +806,39 @@ where assert!(consensus.is_normal(), "on_request: status must be normal"); assert!(!consensus.is_syncing(), "on_request: must not be syncing"); - let prepare = message.project(consensus); - consensus.verify_pipeline(); - consensus.pipeline_message(PlaneKind::Partitions, &prepare); - prepare + // NoAck v2 -> fast path. Quorum + v1 -> VSR pipeline. + if let Some((kind, consumer_id, offset, AckLevel::NoAck)) = consumer_offset + && matches!( + message.header().operation, + Operation::StoreConsumerOffset2 | Operation::DeleteConsumerOffset2, + ) + { + Disposition::NoAck { + request_header: Box::new(*message.header()), + kind, + consumer_id, + offset, + } + } else { + let prepare = message.project(consensus); + consensus.verify_pipeline(); + consensus.pipeline_message(PlaneKind::Partitions, &prepare); + Disposition::Replicate(prepare) + } }; - self.on_replicate(prepare).await; + + match disposition { + Disposition::Replicate(prepare) => self.on_replicate(prepare).await, + Disposition::NoAck { + request_header, + kind, + consumer_id, + offset, + } => { + self.apply_consumer_offset_no_ack(request_header, kind, consumer_id, offset) + .await; + } + } } #[allow(clippy::future_not_send, clippy::too_many_lines)] @@ -972,8 +1106,12 @@ where ); Ok(()) } - Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { - let (kind, consumer_id, offset) = + Operation::StoreConsumerOffset + | Operation::DeleteConsumerOffset + | Operation::StoreConsumerOffset2 + | Operation::DeleteConsumerOffset2 => { + // Replicated path is Quorum-only by construction; ack ignored. + let (kind, consumer_id, offset, _ack) = Self::parse_staged_consumer_offset_commit(header.operation, &message)?; let write_lock = self.write_lock.clone(); let _guard = write_lock.lock().await; @@ -995,7 +1133,7 @@ where .map_err(|_| IggyError::CannotAppendMessage)?; match header.operation { - Operation::StoreConsumerOffset => { + Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 => { self.stage_consumer_offset_upsert( header.op, kind, @@ -1003,7 +1141,7 @@ where offset.expect("store_consumer_offset must include offset"), ); } - Operation::DeleteConsumerOffset => { + Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 => { self.stage_consumer_offset_delete(header.op, kind, consumer_id)?; } _ => unreachable!(), @@ -1331,7 +1469,10 @@ where } !*failed_commit } - Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset => { + Operation::StoreConsumerOffset + | Operation::DeleteConsumerOffset + | Operation::StoreConsumerOffset2 + | Operation::DeleteConsumerOffset2 => { self.commit_consumer_offset_entry(prepare_header, failed_commit) .await } @@ -1372,7 +1513,7 @@ where fn parse_consumer_offset_request( operation: Operation, message: &Message, - ) -> Result<(ConsumerKind, u32, Option), IggyError> { + ) -> Result<(ConsumerKind, u32, Option, AckLevel), IggyError> { let total_size = usize::try_from(message.header().size).map_err(|_| IggyError::InvalidCommand)?; let body = message @@ -1385,7 +1526,7 @@ where fn parse_staged_consumer_offset_commit( operation: Operation, message: &Message, - ) -> Result<(ConsumerKind, u32, Option), IggyError> { + ) -> Result<(ConsumerKind, u32, Option, AckLevel), IggyError> { let total_size = usize::try_from(message.header().size).map_err(|_| IggyError::InvalidCommand)?; let body = message @@ -1398,7 +1539,7 @@ where fn parse_consumer_offset_payload( operation: Operation, body: &[u8], - ) -> Result<(ConsumerKind, u32, Option), IggyError> { + ) -> Result<(ConsumerKind, u32, Option, AckLevel), IggyError> { let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?; let consumer_id = body .get(1..5) @@ -1409,8 +1550,10 @@ where .map_err(|_| IggyError::InvalidCommand) })?; let kind = ConsumerKind::from_code(consumer_kind)?; + // v1 implicitly Quorum. v2 trailing ack byte validated; unknown + // discriminants rejected so malformed wire bytes fail fast. match operation { - Operation::StoreConsumerOffset => { + Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 => { let offset = body.get(5..13) .ok_or(IggyError::InvalidCommand) @@ -1419,9 +1562,23 @@ where .map(u64::from_le_bytes) .map_err(|_| IggyError::InvalidCommand) })?; - Ok((kind, consumer_id, Some(offset))) + let ack = if matches!(operation, Operation::StoreConsumerOffset2) { + let ack_byte = *body.get(13).ok_or(IggyError::InvalidCommand)?; + AckLevel::from_code(ack_byte).map_err(|_| IggyError::InvalidCommand)? + } else { + AckLevel::Quorum + }; + Ok((kind, consumer_id, Some(offset), ack)) + } + Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 => { + let ack = if matches!(operation, Operation::DeleteConsumerOffset2) { + let ack_byte = *body.get(5).ok_or(IggyError::InvalidCommand)?; + AckLevel::from_code(ack_byte).map_err(|_| IggyError::InvalidCommand)? + } else { + AckLevel::Quorum + }; + Ok((kind, consumer_id, None, ack)) } - Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)), _ => Err(IggyError::InvalidCommand), } } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 96a20fa928..0c2c5b5889 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -19,7 +19,7 @@ use bytes::Bytes; use iggy_binary_protocol::consensus::iobuf::Owned; use iggy_binary_protocol::requests::streams::{CreateStreamRequest, DeleteStreamRequest}; use iggy_binary_protocol::{ - Message, Operation, RequestHeader, WireEncode, WireIdentifier, WireName, + AckLevel, Message, Operation, RequestHeader, WireEncode, WireIdentifier, WireName, }; use iggy_common::send_messages2::{ IggyMessage2, IggyMessage2Header, IggyMessages2, SendMessages2Owned, @@ -181,6 +181,41 @@ impl SimClient { self.build_request_with_namespace(Operation::DeleteConsumerOffset, &payload, namespace) } + /// v2 of `store_consumer_offset` with an `AckLevel` byte. `NoAck` takes + /// the primary's fast path (no replication); `Quorum` goes through VSR. + pub fn store_consumer_offset_v2( + &self, + namespace: IggyNamespace, + consumer_kind: u8, + consumer_id: u32, + offset: u64, + ack: AckLevel, + ) -> Message { + let mut payload = Vec::with_capacity(14); + payload.push(consumer_kind); + payload.extend_from_slice(&consumer_id.to_le_bytes()); + payload.extend_from_slice(&offset.to_le_bytes()); + payload.push(ack.as_u8()); + + self.build_request_with_namespace(Operation::StoreConsumerOffset2, &payload, namespace) + } + + /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte. + pub fn delete_consumer_offset_v2( + &self, + namespace: IggyNamespace, + consumer_kind: u8, + consumer_id: u32, + ack: AckLevel, + ) -> Message { + let mut payload = Vec::with_capacity(6); + payload.push(consumer_kind); + payload.extend_from_slice(&consumer_id.to_le_bytes()); + payload.push(ack.as_u8()); + + self.build_request_with_namespace(Operation::DeleteConsumerOffset2, &payload, namespace) + } + #[allow(clippy::cast_possible_truncation)] fn build_request_with_namespace( &self,