Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/binary_protocol/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102;
pub const GET_CONSUMER_OFFSET_CODE: u32 = 120;
pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121;
pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122;
pub const STORE_CONSUMER_OFFSET_2_CODE: u32 = 123;
pub const DELETE_CONSUMER_OFFSET_2_CODE: u32 = 124;

// -- Streams --
pub const GET_STREAM_CODE: u32 = 200;
Expand Down Expand Up @@ -133,6 +135,8 @@ mod tests {
GET_CONSUMER_OFFSET_CODE,
STORE_CONSUMER_OFFSET_CODE,
DELETE_CONSUMER_OFFSET_CODE,
STORE_CONSUMER_OFFSET_2_CODE,
DELETE_CONSUMER_OFFSET_2_CODE,
GET_STREAM_CODE,
GET_STREAMS_CODE,
CREATE_STREAM_CODE,
Expand Down
11 changes: 10 additions & 1 deletion core/binary_protocol/src/consensus/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ pub enum Operation {
SendMessages = 160,
StoreConsumerOffset = 161,
DeleteConsumerOffset = 162,
// 163 is reserved for the planned DeleteSegments move (see TODO above).
StoreConsumerOffset2 = 164,
DeleteConsumerOffset2 = 165,
}

impl Operation {
Expand Down Expand Up @@ -165,7 +168,9 @@ impl Operation {
| Self::DeletePersonalAccessToken
| Self::SendMessages
| Self::StoreConsumerOffset
| Self::DeleteConsumerOffset => match crate::dispatch::lookup_by_operation(*self) {
| Self::DeleteConsumerOffset
| Self::StoreConsumerOffset2
| Self::DeleteConsumerOffset2 => match crate::dispatch::lookup_by_operation(*self) {
Some(meta) => Some(meta.code),
None => None,
},
Expand Down Expand Up @@ -214,6 +219,8 @@ mod tests {
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
Operation::StoreConsumerOffset2,
Operation::DeleteConsumerOffset2,
];
for op in ops {
let code = op
Expand Down Expand Up @@ -270,5 +277,7 @@ mod tests {
assert!(!Operation::SendMessages.is_metadata());
assert!(Operation::DeleteSegments.is_partition());
assert!(Operation::DeleteConsumerOffset.is_partition());
assert!(Operation::StoreConsumerOffset2.is_partition());
assert!(Operation::DeleteConsumerOffset2.is_partition());
}
}
88 changes: 53 additions & 35 deletions core/binary_protocol/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
"consumer_offset.delete",
Operation::DeleteConsumerOffset,
),
CommandMeta::replicated(
STORE_CONSUMER_OFFSET_2_CODE,
"consumer_offset.store.v2",
Operation::StoreConsumerOffset2,
),
CommandMeta::replicated(
DELETE_CONSUMER_OFFSET_2_CODE,
"consumer_offset.delete.v2",
Operation::DeleteConsumerOffset2,
),
// Streams
CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
Expand Down Expand Up @@ -212,28 +222,30 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> {
GET_CONSUMER_OFFSET_CODE => 24,
STORE_CONSUMER_OFFSET_CODE => 25,
DELETE_CONSUMER_OFFSET_CODE => 26,
GET_STREAM_CODE => 27,
GET_STREAMS_CODE => 28,
CREATE_STREAM_CODE => 29,
DELETE_STREAM_CODE => 30,
UPDATE_STREAM_CODE => 31,
PURGE_STREAM_CODE => 32,
GET_TOPIC_CODE => 33,
GET_TOPICS_CODE => 34,
CREATE_TOPIC_CODE => 35,
DELETE_TOPIC_CODE => 36,
UPDATE_TOPIC_CODE => 37,
PURGE_TOPIC_CODE => 38,
CREATE_PARTITIONS_CODE => 39,
DELETE_PARTITIONS_CODE => 40,
DELETE_SEGMENTS_CODE => 41,
GET_CONSUMER_GROUP_CODE => 42,
GET_CONSUMER_GROUPS_CODE => 43,
CREATE_CONSUMER_GROUP_CODE => 44,
DELETE_CONSUMER_GROUP_CODE => 45,
JOIN_CONSUMER_GROUP_CODE => 46,
LEAVE_CONSUMER_GROUP_CODE => 47,
LOGIN_REGISTER_WITH_PAT_CODE => 48,
STORE_CONSUMER_OFFSET_2_CODE => 27,
DELETE_CONSUMER_OFFSET_2_CODE => 28,
GET_STREAM_CODE => 29,
GET_STREAMS_CODE => 30,
CREATE_STREAM_CODE => 31,
DELETE_STREAM_CODE => 32,
UPDATE_STREAM_CODE => 33,
PURGE_STREAM_CODE => 34,
GET_TOPIC_CODE => 35,
GET_TOPICS_CODE => 36,
CREATE_TOPIC_CODE => 37,
DELETE_TOPIC_CODE => 38,
UPDATE_TOPIC_CODE => 39,
PURGE_TOPIC_CODE => 40,
CREATE_PARTITIONS_CODE => 41,
DELETE_PARTITIONS_CODE => 42,
DELETE_SEGMENTS_CODE => 43,
GET_CONSUMER_GROUP_CODE => 44,
GET_CONSUMER_GROUPS_CODE => 45,
CREATE_CONSUMER_GROUP_CODE => 46,
DELETE_CONSUMER_GROUP_CODE => 47,
JOIN_CONSUMER_GROUP_CODE => 48,
LEAVE_CONSUMER_GROUP_CODE => 49,
LOGIN_REGISTER_WITH_PAT_CODE => 50,
_ => return None,
};
Some(&COMMAND_TABLE[idx])
Expand All @@ -247,19 +259,19 @@ pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> {
pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> {
// Indices must match the order of entries in COMMAND_TABLE above.
let idx = match op {
Operation::CreateStream => 29,
Operation::UpdateStream => 31,
Operation::DeleteStream => 30,
Operation::PurgeStream => 32,
Operation::CreateTopic => 35,
Operation::UpdateTopic => 37,
Operation::DeleteTopic => 36,
Operation::PurgeTopic => 38,
Operation::CreatePartitions => 39,
Operation::DeletePartitions => 40,
Operation::DeleteSegments => 41,
Operation::CreateConsumerGroup => 44,
Operation::DeleteConsumerGroup => 45,
Operation::CreateStream => 31,
Operation::UpdateStream => 33,
Operation::DeleteStream => 32,
Operation::PurgeStream => 34,
Operation::CreateTopic => 37,
Operation::UpdateTopic => 39,
Operation::DeleteTopic => 38,
Operation::PurgeTopic => 40,
Operation::CreatePartitions => 41,
Operation::DeletePartitions => 42,
Operation::DeleteSegments => 43,
Operation::CreateConsumerGroup => 46,
Operation::DeleteConsumerGroup => 47,
Operation::CreateUser => 9,
Operation::UpdateUser => 11,
Operation::DeleteUser => 10,
Expand All @@ -270,6 +282,8 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta>
Operation::SendMessages => 22,
Operation::StoreConsumerOffset => 25,
Operation::DeleteConsumerOffset => 26,
Operation::StoreConsumerOffset2 => 27,
Operation::DeleteConsumerOffset2 => 28,
Operation::CreateTopicWithAssignments
| Operation::CreatePartitionsWithAssignments
| Operation::Reserved
Expand Down Expand Up @@ -313,6 +327,8 @@ mod tests {
GET_CONSUMER_OFFSET_CODE,
STORE_CONSUMER_OFFSET_CODE,
DELETE_CONSUMER_OFFSET_CODE,
STORE_CONSUMER_OFFSET_2_CODE,
DELETE_CONSUMER_OFFSET_2_CODE,
GET_STREAM_CODE,
GET_STREAMS_CODE,
CREATE_STREAM_CODE,
Expand Down Expand Up @@ -394,6 +410,8 @@ mod tests {
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
Operation::StoreConsumerOffset2,
Operation::DeleteConsumerOffset2,
];
for op in replicated_ops {
let meta = lookup_by_operation(op)
Expand Down
1 change: 1 addition & 0 deletions core/binary_protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub use framing::{RequestFrame, RequestFrame2, ResponseFrame, ResponseFrame2, ST
pub use message_view::{
WireMessageIterator, WireMessageIteratorMut, WireMessageView, WireMessageViewMut,
};
pub use primitives::ack_level::AckLevel;
pub use primitives::consumer::WireConsumer;
pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, WireName};
pub use primitives::partition_assignment::CreatedPartitionAssignment;
Expand Down
108 changes: 108 additions & 0 deletions core/binary_protocol/src/primitives/ack_level.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::WireError;

/// Acknowledgement policy for consumer-offset write commands.
///
/// Wire format: single `u8` discriminant.
/// - `NoAck(0)`: leader-local write only; respond as soon as the in-memory
/// and on-disk state have been updated. Matches the fast path used by
/// `PollMessages` auto-commit.
/// - `Quorum(1)`: submit through the partition VSR consensus pipeline and
/// respond only after the write has been committed by a quorum of replicas.
/// This is the default for explicit client writes.
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AckLevel {
NoAck = 0,
#[default]
Quorum = 1,
}

impl AckLevel {
/// Decode an `AckLevel` from its wire discriminant.
///
/// # Errors
/// Returns `WireError::UnknownDiscriminant` for unrecognised values.
pub const fn from_code(code: u8) -> Result<Self, WireError> {
match code {
0 => Ok(Self::NoAck),
1 => Ok(Self::Quorum),
other => Err(WireError::UnknownDiscriminant {
type_name: "AckLevel",
value: other,
offset: 0,
}),
}
}

/// Encode this `AckLevel` as its wire discriminant.
#[must_use]
pub const fn as_u8(self) -> u8 {
self as u8
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn roundtrip_no_ack() {
let level = AckLevel::NoAck;
let decoded = AckLevel::from_code(level.as_u8()).unwrap();
assert_eq!(decoded, level);
}

#[test]
fn roundtrip_quorum() {
let level = AckLevel::Quorum;
let decoded = AckLevel::from_code(level.as_u8()).unwrap();
assert_eq!(decoded, level);
}

#[test]
fn default_is_quorum() {
assert_eq!(AckLevel::default(), AckLevel::Quorum);
}

#[test]
fn discriminant_values() {
assert_eq!(AckLevel::NoAck.as_u8(), 0);
assert_eq!(AckLevel::Quorum.as_u8(), 1);
}

#[test]
fn unknown_discriminant_rejected() {
for code in 2u8..=u8::MAX {
let err = AckLevel::from_code(code).unwrap_err();
match err {
WireError::UnknownDiscriminant {
type_name,
value,
offset,
} => {
assert_eq!(type_name, "AckLevel");
assert_eq!(value, code);
assert_eq!(offset, 0);
}
other => panic!("unexpected error variant: {other:?}"),
}
}
}
}
1 change: 1 addition & 0 deletions core/binary_protocol/src/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Shared wire primitives reused across request and response types.

pub mod ack_level;
pub mod consumer;
pub mod identifier;
pub mod partition_assignment;
Expand Down
Loading
Loading