From b68bf9febc9ed52193c5b36b41cf5b45accaec7e Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 29 May 2026 15:18:33 -0600 Subject: [PATCH 1/5] fix(logstash source): preserve writer windows when generating ACKs Filebeat expects ACKs to remain within the current Lumberjack writer window. The logstash source decoder was ignoring WindowSize frames and the receiver could later batch frames from multiple windows together before building an ACK. When that happened, Vector could ACK the highest sequence in the merged batch instead of the last sequence in the current window. That behavior could surface as "invalid sequence number received" on the sender side and could contribute to reconnects and duplicate retransmits under load. Fix this by preserving writer window boundaries during decode, tracking which decoded frames close a window, and emitting one ACK frame per completed window when a batched read contains multiple windows. This keeps the existing batching behavior in the generic TCP path while making the logstash receiver respect the expected ACK semantics. Also add unit tests that demonstrate the bug with both sequence resets and monotonic sequences across adjacent windows. --- changelog.d/25531_logstash_ack_windows.fix.md | 3 + src/sources/logstash.rs | 313 ++++++++++++++++-- 2 files changed, 280 insertions(+), 36 deletions(-) create mode 100644 changelog.d/25531_logstash_ack_windows.fix.md diff --git a/changelog.d/25531_logstash_ack_windows.fix.md b/changelog.d/25531_logstash_ack_windows.fix.md new file mode 100644 index 0000000000000..786406ab211a4 --- /dev/null +++ b/changelog.d/25531_logstash_ack_windows.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to preserve writer window boundaries when generating ACKs. This prevents batched reads from producing ACK sequences that advance past the current window, which could lead to "invalid sequence number received" errors and duplicate retransmits under load. + +authors: bruceg diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index a643ce0af73ba..ad2ccfe65f62c 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -3,6 +3,7 @@ use std::{ convert::TryFrom, io::{self, Read}, net::SocketAddr, + num::NonZeroUsize, time::Duration, }; @@ -264,39 +265,45 @@ impl TcpSource for LogstashSource { } struct LogstashAcker { - sequence_number: u32, - protocol_version: Option, + // Batched reads can contain multiple writer windows. Preserve a separate + // ACK point for each completed window so Filebeat never sees an ACK that + // advances past the current window it is waiting on. If the batch ends in + // the middle of a window, ACK the last received event in that final ACK + // domain so clients are not forced to wait for the advertised window size. + // Mid-stream WindowSize resets are rejected during decode, so the only + // incomplete ACK domain we need to represent here is the final batch tail. + acknowledgements: Vec<(LogstashProtocolVersion, u32)>, } impl LogstashAcker { fn new(frames: &[LogstashEventFrame]) -> Self { - let mut sequence_number = 0; - let mut protocol_version = None; - - for frame in frames { - sequence_number = std::cmp::max(sequence_number, frame.sequence_number); - // We assume that it's valid to ack via any of the protocol versions that we've seen in - // a set of frames from a single stream, so here we just take the last. In reality, we - // do not expect stream with multiple protocol versions to occur. - protocol_version = Some(frame.protocol); + let mut acknowledgements = frames + .iter() + .filter(|frame| frame.window_end) + .map(|frame| (frame.protocol, frame.sequence_number)) + .collect::>(); + + if let Some(frame) = frames.last() + && !frame.window_end + { + acknowledgements.push((frame.protocol, frame.sequence_number)); } - Self { - sequence_number, - protocol_version, - } + Self { acknowledgements } } } impl TcpSourceAcker for LogstashAcker { // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type fn build_ack(self, ack: TcpSourceAck) -> Option { - match (ack, self.protocol_version) { - (TcpSourceAck::Ack, Some(protocol_version)) => { - let mut bytes: Vec = Vec::with_capacity(6); - bytes.push(protocol_version.into()); - bytes.push(LogstashFrameType::Ack.into()); - bytes.extend(self.sequence_number.to_be_bytes().iter()); + match ack { + TcpSourceAck::Ack if !self.acknowledgements.is_empty() => { + let mut bytes: Vec = Vec::with_capacity(self.acknowledgements.len() * 6); + for (protocol_version, sequence_number) in self.acknowledgements { + bytes.push(protocol_version.into()); + bytes.push(LogstashFrameType::Ack.into()); + bytes.extend(sequence_number.to_be_bytes().iter()); + } Some(Bytes::from(bytes)) } _ => None, @@ -315,12 +322,51 @@ enum LogstashDecoderReadState { #[derive(Debug)] struct LogstashDecoder { state: LogstashDecoderReadState, + // Tracks how many events remain in the current writer window. This lets us + // preserve sender window boundaries even if ReadyFrames later batches + // multiple decoded windows together before ACKing. + window_events_remaining: Option, } impl LogstashDecoder { const fn new() -> Self { + Self::new_with_window_events_remaining(None) + } + + const fn new_with_window_events_remaining( + window_events_remaining: Option, + ) -> Self { Self { state: LogstashDecoderReadState::ReadProtocol, + window_events_remaining, + } + } + + /// Marks whether a decoded frame closes the current writer window. + /// + /// Filebeat expects ACKs to stay within the current window announced by the + /// most recent `WindowSize` frame. The generic TCP batching layer can merge + /// frames from multiple windows before we build an ACK, so we record the + /// per-frame window boundary here and let the acker emit one ACK frame per + /// completed window later. + /// + /// If a sender omits `WindowSize`, we keep the previous behavior and treat + /// each standalone frame as ACKable on its own. + const fn annotate_frame(&mut self, frame: &mut LogstashEventFrame) { + match self.window_events_remaining { + Some(remaining) if remaining.get() == 1 => { + frame.window_end = true; + self.window_events_remaining = None; + } + Some(remaining) => { + frame.window_end = false; + self.window_events_remaining = NonZeroUsize::new(remaining.get() - 1); // safe because we know remaining is greater than 1 + } + None => { + // Preserve existing behavior for inputs that send standalone data frames + // without an explicit WindowSize frame. + frame.window_end = true; + } } } } @@ -333,6 +379,11 @@ pub enum DecodeError { UnknownProtocolVersion { version: char }, #[snafu(display("Unknown logstash protocol message type: {}", frame_type))] UnknownFrameType { frame_type: char }, + #[snafu(display( + "Received a new WindowSize frame before consuming the prior window ({} events remaining)", + remaining + ))] + NestedWindowSize { remaining: usize }, #[snafu(display("Failed to decode JSON frame: {}", source))] JsonFrameFailedDecode { source: serde_json::Error }, #[snafu(display("Failed to decompress compressed frame: {}", source))] @@ -347,6 +398,7 @@ impl StreamDecodingError for DecodeError { IO { .. } => false, UnknownProtocolVersion { .. } => false, UnknownFrameType { .. } => false, + NestedWindowSize { .. } => false, JsonFrameFailedDecode { .. } => true, DecompressionFailed { .. } => true, } @@ -440,6 +492,12 @@ struct LogstashEventFrame { protocol: LogstashProtocolVersion, sequence_number: u32, fields: BTreeMap, + window_end: bool, +} + +struct DecodedCompressedFrames { + frames: VecDeque<(LogstashEventFrame, usize)>, + window_events_remaining: Option, } // Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md @@ -492,8 +550,12 @@ impl Decoder for LogstashDecoder { } } // The window size indicates how many events the writer will send before waiting - // for acks. As we forward events as we get them, and ack as they are received, we - // do not need to keep track of this. + // for acks. We preserve this boundary so the acker can emit one ACK per + // completed window, even if multiple windows are batched together later. + // Filebeat accepts cumulative ACKs, but not ACKs that advance past the + // current writer window it is waiting on. If a sender announces a new window + // before the current one has been exhausted, we treat that as a protocol + // error instead of guessing how to preserve ACK boundaries. // // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => { @@ -501,7 +563,14 @@ impl Decoder for LogstashDecoder { return Ok(None); } - let _window_size = src.get_u32(); + if let Some(remaining) = self.window_events_remaining { + return Err(DecodeError::NestedWindowSize { + remaining: remaining.get(), + }); + } + + let window_size = src.get_u32() as usize; + self.window_events_remaining = NonZeroUsize::new(window_size); LogstashDecoderReadState::ReadProtocol } @@ -519,27 +588,37 @@ impl Decoder for LogstashDecoder { } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => { - let Some(frame) = decode_data_frame(protocol, src) else { + let Some((mut frame, byte_size)) = decode_data_frame(protocol, src) else { return Ok(None); }; + self.annotate_frame(&mut frame); - LogstashDecoderReadState::PendingFrames([frame].into()) + LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => { - let Some(frame) = decode_json_frame(protocol, src)? else { + let Some((mut frame, byte_size)) = decode_json_frame(protocol, src)? else { return Ok(None); }; + self.annotate_frame(&mut frame); - LogstashDecoderReadState::PendingFrames([frame].into()) + LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type + // + // The compressed payload is still part of the same logical Lumberjack stream, so + // the nested decoder must inherit the current window state and return the updated + // state after expanding the payload. Re-annotating the emitted frames here would + // overwrite any WindowSize boundaries that were established inside the compressed + // payload and can also lose progress from a partially consumed outer window. LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => { - let Some(frames) = decode_compressed_frame(src)? else { + let Some(decoded) = decode_compressed_frame(src, self.window_events_remaining)? + else { return Ok(None); }; + self.window_events_remaining = decoded.window_events_remaining; - LogstashDecoderReadState::PendingFrames(frames) + LogstashDecoderReadState::PendingFrames(decoded.frames) } }; } @@ -581,6 +660,7 @@ fn decode_data_frame( protocol, sequence_number, fields, + window_end: false, }, byte_size, )) @@ -639,6 +719,7 @@ fn decode_json_frame( protocol, sequence_number, fields, + window_end: false, }, byte_size, ))) @@ -646,7 +727,8 @@ fn decode_json_frame( fn decode_compressed_frame( src: &mut BytesMut, -) -> Result>, DecodeError> { + window_events_remaining: Option, +) -> Result, DecodeError> { let mut rest = src.as_ref(); if rest.remaining() < 4 { @@ -674,14 +756,17 @@ fn decode_compressed_frame( let mut buf = res?; - let mut decoder = LogstashDecoder::new(); + let mut decoder = LogstashDecoder::new_with_window_events_remaining(window_events_remaining); let mut frames = VecDeque::new(); while let Some(s) = decoder.decode(&mut buf)? { frames.push_back(s); } - Ok(Some(frames)) + Ok(Some(DecodedCompressedFrames { + frames, + window_events_remaining: decoder.window_events_remaining, + })) } fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize { @@ -709,10 +794,14 @@ impl From for SmallVec<[Event; 1]> { #[cfg(test)] mod test { + use std::io::Write; + use bytes::BufMut; - use futures::Stream; + use flate2::{Compression, write::ZlibEncoder}; + use futures::{Stream, StreamExt, stream}; use rand::{Rng, rng}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use vector_lib::codecs::ReadyFrames; use vector_lib::lookup::OwnedTargetPath; use vrl::value::kind::Collection; @@ -791,8 +880,7 @@ mod test { assert!(log.get("timestamp").is_some()); } - fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes { - let mut req = BytesMut::new(); + fn push_req(req: &mut BytesMut, seq: u32, pairs: &[(&str, &str)]) { req.put_u8(b'2'); req.put_u8(b'D'); req.put_u32(seq); @@ -803,9 +891,98 @@ mod test { req.put_u32(value.len() as u32); req.put(value.as_bytes()); } + } + + fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes { + let mut req = BytesMut::new(); + push_req(&mut req, seq, pairs); req.into() } + fn push_window_size(req: &mut BytesMut, size: u32) { + req.put_u8(b'2'); + req.put_u8(b'W'); + req.put_u32(size); + } + + fn push_compressed(req: &mut BytesMut, inner: &[u8]) { + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(inner).unwrap(); + let compressed = encoder.finish().unwrap(); + + req.put_u8(b'2'); + req.put_u8(b'C'); + req.put_u32(compressed.len() as u32); + req.put(compressed.as_slice()); + } + + fn decode_frames(mut src: BytesMut) -> Vec<(LogstashEventFrame, usize)> { + let mut decoder = LogstashDecoder::new(); + let mut frames = Vec::new(); + + while let Some(frame) = decoder.decode(&mut src).unwrap() { + frames.push(frame); + } + + assert_eq!(src.len(), 0); + frames + } + + fn decode_acknowledgements(mut ack: Bytes) -> Vec { + let mut acknowledgements = Vec::new(); + + while !ack.is_empty() { + assert!( + ack.len() >= 6, + "ack stream ended with {} trailing bytes", + ack.len() + ); + assert_eq!(ack.get_u8(), b'2'); + assert_eq!(ack.get_u8(), b'A'); + acknowledgements.push(ack.get_u32()); + } + + acknowledgements + } + + fn decode_error(mut src: BytesMut) -> DecodeError { + let mut decoder = LogstashDecoder::new(); + + loop { + match decoder.decode(&mut src) { + Ok(Some(_)) => continue, + Ok(None) => panic!("expected decoder to fail"), + Err(error) => return error, + } + } + } + + async fn assert_acknowledgements_for_ready_frames( + decoded: Vec<(LogstashEventFrame, usize)>, + expected_sequences: &[u32], + expected_acknowledgements: &[u32], + ) { + assert_eq!( + decoded + .iter() + .map(|(frame, _)| frame.sequence_number) + .collect::>(), + expected_sequences + ); + + let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>)); + let mut ready = ReadyFrames::with_capacity(stream, 16); + let (frames, _) = ready.next().await.unwrap().unwrap(); + + let ack = LogstashAcker::new(&frames) + .build_ack(TcpSourceAck::Ack) + .unwrap(); + let acknowledgements = decode_acknowledgements(ack); + + assert!(ready.next().await.is_none()); + assert_eq!(acknowledgements, expected_acknowledgements); + } + #[test] fn v1_decoder_does_not_panic() { let seq = rng().random_range(1..u32::MAX); @@ -818,6 +995,70 @@ mod test { } } + #[tokio::test] + async fn distinct_windows_do_not_share_an_ack_domain() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 1); + push_req(&mut req, 1, &[("message", "first window")]); + push_window_size(&mut req, 2); + push_req(&mut req, 1, &[("message", "second window first")]); + push_req(&mut req, 2, &[("message", "second window second")]); + + let decoded = decode_frames(req); + assert_acknowledgements_for_ready_frames(decoded, &[1, 1, 2], &[1, 2]).await; + } + + #[tokio::test] + async fn distinct_windows_with_monotonic_sequences_ack_the_first_window() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 2); + push_req(&mut req, 1, &[("message", "first window first")]); + push_req(&mut req, 2, &[("message", "first window second")]); + push_window_size(&mut req, 2); + push_req(&mut req, 3, &[("message", "second window first")]); + push_req(&mut req, 4, &[("message", "second window second")]); + + let decoded = decode_frames(req); + assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await; + } + + #[tokio::test] + async fn incomplete_final_window_is_acked_to_the_last_received_event() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "only event in partial window")]); + + let decoded = decode_frames(req); + assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; + } + + #[tokio::test] + async fn compressed_frames_preserve_inner_window_boundaries() { + let mut inner = BytesMut::new(); + push_window_size(&mut inner, 2); + push_req(&mut inner, 1, &[("message", "compressed first")]); + push_req(&mut inner, 2, &[("message", "compressed second")]); + + let mut req = BytesMut::new(); + push_compressed(&mut req, &inner); + + let decoded = decode_frames(req); + assert_acknowledgements_for_ready_frames(decoded, &[1, 2], &[2]).await; + } + + #[test] + fn nested_window_size_is_a_protocol_error() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 2); + push_req(&mut req, 1, &[("message", "first window event")]); + push_window_size(&mut req, 1); + + match decode_error(req) { + DecodeError::NestedWindowSize { remaining } => assert_eq!(remaining, 1), + other => panic!("expected NestedWindowSize error, got {other:?}"), + } + } + async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) { let seq = rng().random_range(1..u32::MAX); let mut socket = tokio::net::TcpStream::connect(address).await.unwrap(); From fdabff3dc0dae7caf7a7b607d2a15208f26829aa Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Fri, 29 May 2026 18:34:14 -0600 Subject: [PATCH 2/5] Revert incomplete window handling regression --- src/sources/logstash.rs | 81 +++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 39 deletions(-) diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index ad2ccfe65f62c..c4148038bc7ae 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -270,8 +270,10 @@ struct LogstashAcker { // advances past the current window it is waiting on. If the batch ends in // the middle of a window, ACK the last received event in that final ACK // domain so clients are not forced to wait for the advertised window size. - // Mid-stream WindowSize resets are rejected during decode, so the only - // incomplete ACK domain we need to represent here is the final batch tail. + // Lumberjack defines WindowSize as a maximum unacked count, so a sender can + // legitimately advertise a fresh window after a previously ACKed partial + // tail. Within a single ReadyFrames batch, the only incomplete ACK domain + // we can represent independently is the final tail we have actually seen. acknowledgements: Vec<(LogstashProtocolVersion, u32)>, } @@ -379,11 +381,6 @@ pub enum DecodeError { UnknownProtocolVersion { version: char }, #[snafu(display("Unknown logstash protocol message type: {}", frame_type))] UnknownFrameType { frame_type: char }, - #[snafu(display( - "Received a new WindowSize frame before consuming the prior window ({} events remaining)", - remaining - ))] - NestedWindowSize { remaining: usize }, #[snafu(display("Failed to decode JSON frame: {}", source))] JsonFrameFailedDecode { source: serde_json::Error }, #[snafu(display("Failed to decompress compressed frame: {}", source))] @@ -398,7 +395,6 @@ impl StreamDecodingError for DecodeError { IO { .. } => false, UnknownProtocolVersion { .. } => false, UnknownFrameType { .. } => false, - NestedWindowSize { .. } => false, JsonFrameFailedDecode { .. } => true, DecompressionFailed { .. } => true, } @@ -553,9 +549,12 @@ impl Decoder for LogstashDecoder { // for acks. We preserve this boundary so the acker can emit one ACK per // completed window, even if multiple windows are batched together later. // Filebeat accepts cumulative ACKs, but not ACKs that advance past the - // current writer window it is waiting on. If a sender announces a new window - // before the current one has been exhausted, we treat that as a protocol - // error instead of guessing how to preserve ACK boundaries. + // current writer window it is waiting on. WindowSize is a maximum unacked + // count, not necessarily an exact count of immediately following frames, so a + // sender can legitimately advertise a new window after a previously ACKed + // partial tail. If a malformed sender does this before that earlier tail has + // actually been ACKed, we tolerate the reset here even though it can collapse + // the older incomplete domain into the new one. // // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => { @@ -563,12 +562,6 @@ impl Decoder for LogstashDecoder { return Ok(None); } - if let Some(remaining) = self.window_events_remaining { - return Err(DecodeError::NestedWindowSize { - remaining: remaining.get(), - }); - } - let window_size = src.get_u32() as usize; self.window_events_remaining = NonZeroUsize::new(window_size); @@ -945,18 +938,6 @@ mod test { acknowledgements } - fn decode_error(mut src: BytesMut) -> DecodeError { - let mut decoder = LogstashDecoder::new(); - - loop { - match decoder.decode(&mut src) { - Ok(Some(_)) => continue, - Ok(None) => panic!("expected decoder to fail"), - Err(error) => return error, - } - } - } - async fn assert_acknowledgements_for_ready_frames( decoded: Vec<(LogstashEventFrame, usize)>, expected_sequences: &[u32], @@ -983,6 +964,20 @@ mod test { assert_eq!(acknowledgements, expected_acknowledgements); } + fn decode_frames_with_decoder( + decoder: &mut LogstashDecoder, + mut src: BytesMut, + ) -> Vec<(LogstashEventFrame, usize)> { + let mut frames = Vec::new(); + + while let Some(frame) = decoder.decode(&mut src).unwrap() { + frames.push(frame); + } + + assert_eq!(src.len(), 0); + frames + } + #[test] fn v1_decoder_does_not_panic() { let seq = rng().random_range(1..u32::MAX); @@ -1046,17 +1041,25 @@ mod test { assert_acknowledgements_for_ready_frames(decoded, &[1, 2], &[2]).await; } - #[test] - fn nested_window_size_is_a_protocol_error() { - let mut req = BytesMut::new(); - push_window_size(&mut req, 2); - push_req(&mut req, 1, &[("message", "first window event")]); - push_window_size(&mut req, 1); + #[tokio::test] + async fn fresh_window_after_acked_partial_tail_is_accepted() { + let mut decoder = LogstashDecoder::new(); - match decode_error(req) { - DecodeError::NestedWindowSize { remaining } => assert_eq!(remaining, 1), - other => panic!("expected NestedWindowSize error, got {other:?}"), - } + let mut first_batch = BytesMut::new(); + push_window_size(&mut first_batch, 2); + push_req(&mut first_batch, 1, &[("message", "first partial tail")]); + let decoded = decode_frames_with_decoder(&mut decoder, first_batch); + assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; + + let mut second_batch = BytesMut::new(); + push_window_size(&mut second_batch, 1); + push_req( + &mut second_batch, + 1, + &[("message", "fresh window after ack")], + ); + let decoded = decode_frames_with_decoder(&mut decoder, second_batch); + assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; } async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) { From 7cf29c348f2e8a318653d7e7168ec2127e9b61c3 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Mon, 1 Jun 2026 10:32:47 -0600 Subject: [PATCH 3/5] Simplify logic in LogstashAcker::new --- src/sources/logstash.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index c4148038bc7ae..c5d5a19c12ec6 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -279,17 +279,14 @@ struct LogstashAcker { impl LogstashAcker { fn new(frames: &[LogstashEventFrame]) -> Self { - let mut acknowledgements = frames + let acknowledgements = frames .iter() - .filter(|frame| frame.window_end) - .map(|frame| (frame.protocol, frame.sequence_number)) - .collect::>(); - - if let Some(frame) = frames.last() - && !frame.window_end - { - acknowledgements.push((frame.protocol, frame.sequence_number)); - } + .enumerate() + // ACK each completed writer window and the last frame in a partial batch if ReadyFrames + // flushes before the current window is complete. + .filter(|(index, frame)| frame.window_end || index + 1 == frames.len()) + .map(|(_, frame)| (frame.protocol, frame.sequence_number)) + .collect(); Self { acknowledgements } } From 8a621b5ae7e1f157c4e32b47093c5959fb0cc06e Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Mon, 1 Jun 2026 11:15:50 -0600 Subject: [PATCH 4/5] Add unit test for acking a partial window and assert all decoded sequence numbers --- src/sources/logstash.rs | 81 ++++++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 13 deletions(-) diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index c5d5a19c12ec6..6df25baeb47c0 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -935,18 +935,26 @@ mod test { acknowledgements } + fn decoded_sequence_numbers(decoded: &[(LogstashEventFrame, usize)]) -> Vec { + decoded + .iter() + .map(|(frame, _)| frame.sequence_number) + .collect::>() + } + + fn assert_decoded_sequences( + decoded: &[(LogstashEventFrame, usize)], + expected_sequences: &[u32], + ) { + assert_eq!(decoded_sequence_numbers(decoded), expected_sequences); + } + async fn assert_acknowledgements_for_ready_frames( decoded: Vec<(LogstashEventFrame, usize)>, expected_sequences: &[u32], expected_acknowledgements: &[u32], ) { - assert_eq!( - decoded - .iter() - .map(|(frame, _)| frame.sequence_number) - .collect::>(), - expected_sequences - ); + assert_decoded_sequences(&decoded, expected_sequences); let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>)); let mut ready = ReadyFrames::with_capacity(stream, 16); @@ -961,6 +969,15 @@ mod test { assert_eq!(acknowledgements, expected_acknowledgements); } + fn decode_frames_and_assert_sequences( + src: BytesMut, + expected_sequences: &[u32], + ) -> Vec<(LogstashEventFrame, usize)> { + let decoded = decode_frames(src); + assert_decoded_sequences(&decoded, expected_sequences); + decoded + } + fn decode_frames_with_decoder( decoder: &mut LogstashDecoder, mut src: BytesMut, @@ -975,6 +992,16 @@ mod test { frames } + fn decode_frames_with_decoder_and_assert_sequences( + decoder: &mut LogstashDecoder, + src: BytesMut, + expected_sequences: &[u32], + ) -> Vec<(LogstashEventFrame, usize)> { + let decoded = decode_frames_with_decoder(decoder, src); + assert_decoded_sequences(&decoded, expected_sequences); + decoded + } + #[test] fn v1_decoder_does_not_panic() { let seq = rng().random_range(1..u32::MAX); @@ -996,7 +1023,7 @@ mod test { push_req(&mut req, 1, &[("message", "second window first")]); push_req(&mut req, 2, &[("message", "second window second")]); - let decoded = decode_frames(req); + let decoded = decode_frames_and_assert_sequences(req, &[1, 1, 2]); assert_acknowledgements_for_ready_frames(decoded, &[1, 1, 2], &[1, 2]).await; } @@ -1010,7 +1037,7 @@ mod test { push_req(&mut req, 3, &[("message", "second window first")]); push_req(&mut req, 4, &[("message", "second window second")]); - let decoded = decode_frames(req); + let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await; } @@ -1020,7 +1047,7 @@ mod test { push_window_size(&mut req, 4); push_req(&mut req, 1, &[("message", "only event in partial window")]); - let decoded = decode_frames(req); + let decoded = decode_frames_and_assert_sequences(req, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; } @@ -1034,10 +1061,36 @@ mod test { let mut req = BytesMut::new(); push_compressed(&mut req, &inner); - let decoded = decode_frames(req); + let decoded = decode_frames_and_assert_sequences(req, &[1, 2]); assert_acknowledgements_for_ready_frames(decoded, &[1, 2], &[2]).await; } + #[tokio::test] + async fn single_window_split_across_ready_frames_keeps_progressive_acks() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "first")]); + push_req(&mut req, 2, &[("message", "second")]); + push_req(&mut req, 3, &[("message", "third")]); + push_req(&mut req, 4, &[("message", "fourth")]); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); + + let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>)); + let mut ready = ReadyFrames::with_capacity(stream, 2); + let mut acknowledgements = Vec::new(); + + while let Some(result) = ready.next().await { + let (frames, _byte_size) = result.unwrap(); + let ack = LogstashAcker::new(&frames) + .build_ack(TcpSourceAck::Ack) + .unwrap(); + acknowledgements.push(decode_acknowledgements(ack)); + } + + assert_eq!(acknowledgements, vec![vec![2], vec![4]]); + } + #[tokio::test] async fn fresh_window_after_acked_partial_tail_is_accepted() { let mut decoder = LogstashDecoder::new(); @@ -1045,7 +1098,8 @@ mod test { let mut first_batch = BytesMut::new(); push_window_size(&mut first_batch, 2); push_req(&mut first_batch, 1, &[("message", "first partial tail")]); - let decoded = decode_frames_with_decoder(&mut decoder, first_batch); + let decoded = + decode_frames_with_decoder_and_assert_sequences(&mut decoder, first_batch, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; let mut second_batch = BytesMut::new(); @@ -1055,7 +1109,8 @@ mod test { 1, &[("message", "fresh window after ack")], ); - let decoded = decode_frames_with_decoder(&mut decoder, second_batch); + let decoded = + decode_frames_with_decoder_and_assert_sequences(&mut decoder, second_batch, &[1]); assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; } From 67ff5d4e80a5fb1ef2a0e568453dfc14773bc6a4 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Mon, 1 Jun 2026 11:20:03 -0600 Subject: [PATCH 5/5] Use a SmallVec for the acknowledgements --- src/sources/logstash.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 6df25baeb47c0..a8b6c6b389e28 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -274,7 +274,10 @@ struct LogstashAcker { // legitimately advertise a fresh window after a previously ACKed partial // tail. Within a single ReadyFrames batch, the only incomplete ACK domain // we can represent independently is the final tail we have actually seen. - acknowledgements: Vec<(LogstashProtocolVersion, u32)>, + // We expect most batches to need only one ACK point, either for a single + // completed window or for one partial tail. Multiple ACKs are only needed + // when ReadyFrames coalesces multiple logical windows into one batch. + acknowledgements: SmallVec<[(LogstashProtocolVersion, u32); 1]>, } impl LogstashAcker {