diff --git a/changelog.d/25531_logstash_ack_windows.fix.md b/changelog.d/25531_logstash_ack_windows.fix.md new file mode 100644 index 0000000000000..786406ab211a4 --- /dev/null +++ b/changelog.d/25531_logstash_ack_windows.fix.md @@ -0,0 +1,3 @@ +Fixed the `logstash` source to preserve writer window boundaries when generating ACKs. This prevents batched reads from producing ACK sequences that advance past the current window, which could lead to "invalid sequence number received" errors and duplicate retransmits under load. + +authors: bruceg diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index a643ce0af73ba..a8b6c6b389e28 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -3,6 +3,7 @@ use std::{ convert::TryFrom, io::{self, Read}, net::SocketAddr, + num::NonZeroUsize, time::Duration, }; @@ -264,39 +265,47 @@ impl TcpSource for LogstashSource { } struct LogstashAcker { - sequence_number: u32, - protocol_version: Option, + // Batched reads can contain multiple writer windows. Preserve a separate + // ACK point for each completed window so Filebeat never sees an ACK that + // advances past the current window it is waiting on. If the batch ends in + // the middle of a window, ACK the last received event in that final ACK + // domain so clients are not forced to wait for the advertised window size. + // Lumberjack defines WindowSize as a maximum unacked count, so a sender can + // legitimately advertise a fresh window after a previously ACKed partial + // tail. Within a single ReadyFrames batch, the only incomplete ACK domain + // we can represent independently is the final tail we have actually seen. + // We expect most batches to need only one ACK point, either for a single + // completed window or for one partial tail. Multiple ACKs are only needed + // when ReadyFrames coalesces multiple logical windows into one batch. + acknowledgements: SmallVec<[(LogstashProtocolVersion, u32); 1]>, } impl LogstashAcker { fn new(frames: &[LogstashEventFrame]) -> Self { - let mut sequence_number = 0; - let mut protocol_version = None; - - for frame in frames { - sequence_number = std::cmp::max(sequence_number, frame.sequence_number); - // We assume that it's valid to ack via any of the protocol versions that we've seen in - // a set of frames from a single stream, so here we just take the last. In reality, we - // do not expect stream with multiple protocol versions to occur. - protocol_version = Some(frame.protocol); - } - - Self { - sequence_number, - protocol_version, - } + let acknowledgements = frames + .iter() + .enumerate() + // ACK each completed writer window and the last frame in a partial batch if ReadyFrames + // flushes before the current window is complete. + .filter(|(index, frame)| frame.window_end || index + 1 == frames.len()) + .map(|(_, frame)| (frame.protocol, frame.sequence_number)) + .collect(); + + Self { acknowledgements } } } impl TcpSourceAcker for LogstashAcker { // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type fn build_ack(self, ack: TcpSourceAck) -> Option { - match (ack, self.protocol_version) { - (TcpSourceAck::Ack, Some(protocol_version)) => { - let mut bytes: Vec = Vec::with_capacity(6); - bytes.push(protocol_version.into()); - bytes.push(LogstashFrameType::Ack.into()); - bytes.extend(self.sequence_number.to_be_bytes().iter()); + match ack { + TcpSourceAck::Ack if !self.acknowledgements.is_empty() => { + let mut bytes: Vec = Vec::with_capacity(self.acknowledgements.len() * 6); + for (protocol_version, sequence_number) in self.acknowledgements { + bytes.push(protocol_version.into()); + bytes.push(LogstashFrameType::Ack.into()); + bytes.extend(sequence_number.to_be_bytes().iter()); + } Some(Bytes::from(bytes)) } _ => None, @@ -315,12 +324,51 @@ enum LogstashDecoderReadState { #[derive(Debug)] struct LogstashDecoder { state: LogstashDecoderReadState, + // Tracks how many events remain in the current writer window. This lets us + // preserve sender window boundaries even if ReadyFrames later batches + // multiple decoded windows together before ACKing. + window_events_remaining: Option, } impl LogstashDecoder { const fn new() -> Self { + Self::new_with_window_events_remaining(None) + } + + const fn new_with_window_events_remaining( + window_events_remaining: Option, + ) -> Self { Self { state: LogstashDecoderReadState::ReadProtocol, + window_events_remaining, + } + } + + /// Marks whether a decoded frame closes the current writer window. + /// + /// Filebeat expects ACKs to stay within the current window announced by the + /// most recent `WindowSize` frame. The generic TCP batching layer can merge + /// frames from multiple windows before we build an ACK, so we record the + /// per-frame window boundary here and let the acker emit one ACK frame per + /// completed window later. + /// + /// If a sender omits `WindowSize`, we keep the previous behavior and treat + /// each standalone frame as ACKable on its own. + const fn annotate_frame(&mut self, frame: &mut LogstashEventFrame) { + match self.window_events_remaining { + Some(remaining) if remaining.get() == 1 => { + frame.window_end = true; + self.window_events_remaining = None; + } + Some(remaining) => { + frame.window_end = false; + self.window_events_remaining = NonZeroUsize::new(remaining.get() - 1); // safe because we know remaining is greater than 1 + } + None => { + // Preserve existing behavior for inputs that send standalone data frames + // without an explicit WindowSize frame. + frame.window_end = true; + } } } } @@ -440,6 +488,12 @@ struct LogstashEventFrame { protocol: LogstashProtocolVersion, sequence_number: u32, fields: BTreeMap, + window_end: bool, +} + +struct DecodedCompressedFrames { + frames: VecDeque<(LogstashEventFrame, usize)>, + window_events_remaining: Option, } // Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md @@ -492,8 +546,15 @@ impl Decoder for LogstashDecoder { } } // The window size indicates how many events the writer will send before waiting - // for acks. As we forward events as we get them, and ack as they are received, we - // do not need to keep track of this. + // for acks. We preserve this boundary so the acker can emit one ACK per + // completed window, even if multiple windows are batched together later. + // Filebeat accepts cumulative ACKs, but not ACKs that advance past the + // current writer window it is waiting on. WindowSize is a maximum unacked + // count, not necessarily an exact count of immediately following frames, so a + // sender can legitimately advertise a new window after a previously ACKed + // partial tail. If a malformed sender does this before that earlier tail has + // actually been ACKed, we tolerate the reset here even though it can collapse + // the older incomplete domain into the new one. // // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => { @@ -501,7 +562,8 @@ impl Decoder for LogstashDecoder { return Ok(None); } - let _window_size = src.get_u32(); + let window_size = src.get_u32() as usize; + self.window_events_remaining = NonZeroUsize::new(window_size); LogstashDecoderReadState::ReadProtocol } @@ -519,27 +581,37 @@ impl Decoder for LogstashDecoder { } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => { - let Some(frame) = decode_data_frame(protocol, src) else { + let Some((mut frame, byte_size)) = decode_data_frame(protocol, src) else { return Ok(None); }; + self.annotate_frame(&mut frame); - LogstashDecoderReadState::PendingFrames([frame].into()) + LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => { - let Some(frame) = decode_json_frame(protocol, src)? else { + let Some((mut frame, byte_size)) = decode_json_frame(protocol, src)? else { return Ok(None); }; + self.annotate_frame(&mut frame); - LogstashDecoderReadState::PendingFrames([frame].into()) + LogstashDecoderReadState::PendingFrames([(frame, byte_size)].into()) } // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type + // + // The compressed payload is still part of the same logical Lumberjack stream, so + // the nested decoder must inherit the current window state and return the updated + // state after expanding the payload. Re-annotating the emitted frames here would + // overwrite any WindowSize boundaries that were established inside the compressed + // payload and can also lose progress from a partially consumed outer window. LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => { - let Some(frames) = decode_compressed_frame(src)? else { + let Some(decoded) = decode_compressed_frame(src, self.window_events_remaining)? + else { return Ok(None); }; + self.window_events_remaining = decoded.window_events_remaining; - LogstashDecoderReadState::PendingFrames(frames) + LogstashDecoderReadState::PendingFrames(decoded.frames) } }; } @@ -581,6 +653,7 @@ fn decode_data_frame( protocol, sequence_number, fields, + window_end: false, }, byte_size, )) @@ -639,6 +712,7 @@ fn decode_json_frame( protocol, sequence_number, fields, + window_end: false, }, byte_size, ))) @@ -646,7 +720,8 @@ fn decode_json_frame( fn decode_compressed_frame( src: &mut BytesMut, -) -> Result>, DecodeError> { + window_events_remaining: Option, +) -> Result, DecodeError> { let mut rest = src.as_ref(); if rest.remaining() < 4 { @@ -674,14 +749,17 @@ fn decode_compressed_frame( let mut buf = res?; - let mut decoder = LogstashDecoder::new(); + let mut decoder = LogstashDecoder::new_with_window_events_remaining(window_events_remaining); let mut frames = VecDeque::new(); while let Some(s) = decoder.decode(&mut buf)? { frames.push_back(s); } - Ok(Some(frames)) + Ok(Some(DecodedCompressedFrames { + frames, + window_events_remaining: decoder.window_events_remaining, + })) } fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize { @@ -709,10 +787,14 @@ impl From for SmallVec<[Event; 1]> { #[cfg(test)] mod test { + use std::io::Write; + use bytes::BufMut; - use futures::Stream; + use flate2::{Compression, write::ZlibEncoder}; + use futures::{Stream, StreamExt, stream}; use rand::{Rng, rng}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use vector_lib::codecs::ReadyFrames; use vector_lib::lookup::OwnedTargetPath; use vrl::value::kind::Collection; @@ -791,8 +873,7 @@ mod test { assert!(log.get("timestamp").is_some()); } - fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes { - let mut req = BytesMut::new(); + fn push_req(req: &mut BytesMut, seq: u32, pairs: &[(&str, &str)]) { req.put_u8(b'2'); req.put_u8(b'D'); req.put_u32(seq); @@ -803,9 +884,127 @@ mod test { req.put_u32(value.len() as u32); req.put(value.as_bytes()); } + } + + fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes { + let mut req = BytesMut::new(); + push_req(&mut req, seq, pairs); req.into() } + fn push_window_size(req: &mut BytesMut, size: u32) { + req.put_u8(b'2'); + req.put_u8(b'W'); + req.put_u32(size); + } + + fn push_compressed(req: &mut BytesMut, inner: &[u8]) { + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(inner).unwrap(); + let compressed = encoder.finish().unwrap(); + + req.put_u8(b'2'); + req.put_u8(b'C'); + req.put_u32(compressed.len() as u32); + req.put(compressed.as_slice()); + } + + fn decode_frames(mut src: BytesMut) -> Vec<(LogstashEventFrame, usize)> { + let mut decoder = LogstashDecoder::new(); + let mut frames = Vec::new(); + + while let Some(frame) = decoder.decode(&mut src).unwrap() { + frames.push(frame); + } + + assert_eq!(src.len(), 0); + frames + } + + fn decode_acknowledgements(mut ack: Bytes) -> Vec { + let mut acknowledgements = Vec::new(); + + while !ack.is_empty() { + assert!( + ack.len() >= 6, + "ack stream ended with {} trailing bytes", + ack.len() + ); + assert_eq!(ack.get_u8(), b'2'); + assert_eq!(ack.get_u8(), b'A'); + acknowledgements.push(ack.get_u32()); + } + + acknowledgements + } + + fn decoded_sequence_numbers(decoded: &[(LogstashEventFrame, usize)]) -> Vec { + decoded + .iter() + .map(|(frame, _)| frame.sequence_number) + .collect::>() + } + + fn assert_decoded_sequences( + decoded: &[(LogstashEventFrame, usize)], + expected_sequences: &[u32], + ) { + assert_eq!(decoded_sequence_numbers(decoded), expected_sequences); + } + + async fn assert_acknowledgements_for_ready_frames( + decoded: Vec<(LogstashEventFrame, usize)>, + expected_sequences: &[u32], + expected_acknowledgements: &[u32], + ) { + assert_decoded_sequences(&decoded, expected_sequences); + + let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>)); + let mut ready = ReadyFrames::with_capacity(stream, 16); + let (frames, _) = ready.next().await.unwrap().unwrap(); + + let ack = LogstashAcker::new(&frames) + .build_ack(TcpSourceAck::Ack) + .unwrap(); + let acknowledgements = decode_acknowledgements(ack); + + assert!(ready.next().await.is_none()); + assert_eq!(acknowledgements, expected_acknowledgements); + } + + fn decode_frames_and_assert_sequences( + src: BytesMut, + expected_sequences: &[u32], + ) -> Vec<(LogstashEventFrame, usize)> { + let decoded = decode_frames(src); + assert_decoded_sequences(&decoded, expected_sequences); + decoded + } + + fn decode_frames_with_decoder( + decoder: &mut LogstashDecoder, + mut src: BytesMut, + ) -> Vec<(LogstashEventFrame, usize)> { + let mut frames = Vec::new(); + + while let Some(frame) = decoder.decode(&mut src).unwrap() { + frames.push(frame); + } + + assert_eq!(src.len(), 0); + frames + } + + fn decode_frames_with_decoder_and_assert_sequences( + decoder: &mut LogstashDecoder, + src: BytesMut, + expected_sequences: &[u32], + ) -> Vec<(LogstashEventFrame, usize)> { + let decoded = decode_frames_with_decoder(decoder, src); + assert_decoded_sequences(&decoded, expected_sequences); + decoded + } + #[test] fn v1_decoder_does_not_panic() { let seq = rng().random_range(1..u32::MAX); @@ -818,6 +1017,106 @@ mod test { } } + #[tokio::test] + async fn distinct_windows_do_not_share_an_ack_domain() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 1); + push_req(&mut req, 1, &[("message", "first window")]); + push_window_size(&mut req, 2); + push_req(&mut req, 1, &[("message", "second window first")]); + push_req(&mut req, 2, &[("message", "second window second")]); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 1, 2]); + assert_acknowledgements_for_ready_frames(decoded, &[1, 1, 2], &[1, 2]).await; + } + + #[tokio::test] + async fn distinct_windows_with_monotonic_sequences_ack_the_first_window() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 2); + push_req(&mut req, 1, &[("message", "first window first")]); + push_req(&mut req, 2, &[("message", "first window second")]); + push_window_size(&mut req, 2); + push_req(&mut req, 3, &[("message", "second window first")]); + push_req(&mut req, 4, &[("message", "second window second")]); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); + assert_acknowledgements_for_ready_frames(decoded, &[1, 2, 3, 4], &[2, 4]).await; + } + + #[tokio::test] + async fn incomplete_final_window_is_acked_to_the_last_received_event() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "only event in partial window")]); + + let decoded = decode_frames_and_assert_sequences(req, &[1]); + assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; + } + + #[tokio::test] + async fn compressed_frames_preserve_inner_window_boundaries() { + let mut inner = BytesMut::new(); + push_window_size(&mut inner, 2); + push_req(&mut inner, 1, &[("message", "compressed first")]); + push_req(&mut inner, 2, &[("message", "compressed second")]); + + let mut req = BytesMut::new(); + push_compressed(&mut req, &inner); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 2]); + assert_acknowledgements_for_ready_frames(decoded, &[1, 2], &[2]).await; + } + + #[tokio::test] + async fn single_window_split_across_ready_frames_keeps_progressive_acks() { + let mut req = BytesMut::new(); + push_window_size(&mut req, 4); + push_req(&mut req, 1, &[("message", "first")]); + push_req(&mut req, 2, &[("message", "second")]); + push_req(&mut req, 3, &[("message", "third")]); + push_req(&mut req, 4, &[("message", "fourth")]); + + let decoded = decode_frames_and_assert_sequences(req, &[1, 2, 3, 4]); + + let stream = stream::iter(decoded.into_iter().map(Ok::<_, DecodeError>)); + let mut ready = ReadyFrames::with_capacity(stream, 2); + let mut acknowledgements = Vec::new(); + + while let Some(result) = ready.next().await { + let (frames, _byte_size) = result.unwrap(); + let ack = LogstashAcker::new(&frames) + .build_ack(TcpSourceAck::Ack) + .unwrap(); + acknowledgements.push(decode_acknowledgements(ack)); + } + + assert_eq!(acknowledgements, vec![vec![2], vec![4]]); + } + + #[tokio::test] + async fn fresh_window_after_acked_partial_tail_is_accepted() { + let mut decoder = LogstashDecoder::new(); + + let mut first_batch = BytesMut::new(); + push_window_size(&mut first_batch, 2); + push_req(&mut first_batch, 1, &[("message", "first partial tail")]); + let decoded = + decode_frames_with_decoder_and_assert_sequences(&mut decoder, first_batch, &[1]); + assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; + + let mut second_batch = BytesMut::new(); + push_window_size(&mut second_batch, 1); + push_req( + &mut second_batch, + 1, + &[("message", "fresh window after ack")], + ); + let decoded = + decode_frames_with_decoder_and_assert_sequences(&mut decoder, second_batch, &[1]); + assert_acknowledgements_for_ready_frames(decoded, &[1], &[1]).await; + } + async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) { let seq = rng().random_range(1..u32::MAX); let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();