From 150e4f052f1435a368f077de0947a25524e1c575 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 29 May 2026 17:17:13 +0000 Subject: [PATCH] chore(unit tests): failing disk_v2 bug demonstrations Seven tests that assert the CORRECT invariant and therefore FAIL against current Vector, each reproducing a real disk_v2 data-loss / accounting bug: #24606 (component drop metric), #21683 (total_buffer_size decrement underflow), get_total_records 0-1 underflow, #24948 (writer drop loses buffered events), finalizer status-discard (rejected delivery acked), reader.rs:932 file-id rollover, reader.rs:524 size-delta underflow. Runnable with plain cargo test -p vector-buffers. --- lib/vector-buffers/src/buffer_usage_data.rs | 64 +++++ .../disk_v2/tests/acknowledgements.rs | 50 ++++ .../src/variants/disk_v2/tests/invariants.rs | 233 ++++++++++++++++++ 3 files changed, 347 insertions(+) diff --git a/lib/vector-buffers/src/buffer_usage_data.rs b/lib/vector-buffers/src/buffer_usage_data.rs index b418023657bb5..80e1df130a485 100644 --- a/lib/vector-buffers/src/buffer_usage_data.rs +++ b/lib/vector-buffers/src/buffer_usage_data.rs @@ -505,4 +505,68 @@ mod tests { assert_eq!(current.event_count, 10); assert_eq!(current.event_byte_size, 1000); } + + /// Demonstration of Vector issue #24606. + /// + /// When a disk buffer drops events because of `when_full = drop_newest`, the + /// buffer-level `buffer_discarded_events_total` counter is incremented, but + /// the component-level `component_discarded_events_total` (the metric + /// operators monitor for data loss) is NEVER emitted by the buffer drop + /// path — so the loss is silent on standard dashboards. + /// + /// This exercises the exact reporter code path that runs for real + /// drop_newest drops: `BufferUsageData::report` -> `emit(BufferEventsDropped + /// { intentional: true, reason: "drop_newest", .. })`. A local metrics + /// recorder captures everything emitted. + /// + /// CORRECT INVARIANT (asserted here): dropped events must also be counted at + /// the component level so operators see the loss. This test currently FAILS + /// against Vector because #24606 is unfixed — `component_discarded_events_total` + /// stays at 0 for disk-buffer drop_newest drops. The failure IS the bug + /// demonstration; it will pass once #24606 is fixed. + #[test] + fn drop_newest_drops_should_increment_component_discarded_metric_issue_24606() { + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let data = BufferUsageData::new(0); + let mut metrics = ReporterCurrentMetrics::default(); + // 5 events dropped by `when_full = drop_newest` (intentional drops), + // exactly as `BufferSender` records them on the disk-buffer drop path. + data.dropped_intentional.increment(5, 500); + data.report(&mut metrics, "demo_disk_buffer"); + }); + + let mut buffer_discarded = 0u64; + let mut component_discarded = 0u64; + for (ckey, _unit, _desc, value) in snapshotter.snapshot().into_vec() { + let name = ckey.key().name().to_string(); + if let DebugValue::Counter(c) = value { + if name == "buffer_discarded_events_total" { + buffer_discarded += c; + } else if name == "component_discarded_events_total" { + component_discarded += c; + } + } + } + + // The buffer accounted for the drops at the buffer level: + assert_eq!( + buffer_discarded, 5, + "buffer_discarded_events_total should reflect the 5 drop_newest drops" + ); + // CORRECT INVARIANT: the component-level counter operators watch for + // data loss MUST also reflect the 5 drops. This currently FAILS (actual + // 0) — that failure is the #24606 demonstration: disk-buffer drop_newest + // drops are invisible on standard dashboards. + assert_eq!( + component_discarded, 5, + "#24606: component_discarded_events_total must reflect drop_newest \ + drops (got {component_discarded}; the buffer never emits the \ + component-level metric, so the loss is silent on dashboards)" + ); + } } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs b/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs index a63ed70635a40..4b90de0585762 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/acknowledgements.rs @@ -83,3 +83,53 @@ async fn ack_wakes_reader() { }) .await; } + +#[tokio::test] +async fn rejected_delivery_should_not_advance_acks_finalizer_status_discard() { + // Failing demonstration: `spawn_finalizer` does + // `while let Some((_status, amount)) = stream.next().await { increment_pending_acks(amount) }` + // — the BatchStatus is discarded. + // + // CORRECT INVARIANT (asserted here): a REJECTED delivery (the sink gave up / + // could not deliver) must NOT advance the buffer's pending acks — the events + // were not acknowledged and must be retained. Because the finalizer ignores + // BatchStatus, a rejection still advances acks, so the buffer forgets the + // events as if delivered. This test FAILS (acks advance by 7, not 0), + // demonstrating the silent within-process data loss. + with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + + async move { + let usage_handle = BufferUsageHandle::noop(); + let config = DiskBufferConfigBuilder::from_path(data_dir) + .build() + .expect("creating buffer should not fail"); + let ledger = Ledger::load_or_create(config, usage_handle) + .await + .expect("ledger should not fail to load/create"); + let ledger = Arc::new(ledger); + let finalizer = Arc::clone(&ledger).spawn_finalizer(); + assert_eq!(ledger.consume_pending_acks(), 0); + + // A batch of 7 events whose delivery is REJECTED (not Delivered). + let (batch, receiver) = BatchNotifier::new_with_receiver(); + finalizer.add(7, receiver); + let efin = EventFinalizer::new(batch); + efin.update_status(EventStatus::Rejected); + drop(efin); // sends the Rejected status update + tokio::task::yield_now().await; + + // Finalizer BatchStatus discard: a rejected (failed) delivery must + // leave pending acks at 0. It instead advances by the full count, so + // the buffer forgets the events as if delivered — this assertion FAILS. + assert_eq!( + ledger.consume_pending_acks(), + 0, + "rejected delivery must not advance acks, but advanced by 7 — the \ + finalizer ignores BatchStatus, so failed deliveries are silently \ + treated as acknowledged" + ); + } + }) + .await; +} diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs b/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs index a37075fce4b1e..7efa8774789c9 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs @@ -927,3 +927,236 @@ async fn reader_writer_positions_aligned_through_multiple_files_and_records() { let parent = trace_span!("reader_writer_positions_aligned_through_multiple_files_and_records"); fut.instrument(parent.or_current()).await; } + +#[tokio::test] +async fn ledger_total_buffer_size_decrement_should_saturate_not_underflow_issue_21683() { + // Failing demonstration of Vector #21683. + // + // CORRECT INVARIANT (asserted here): decrementing `total_buffer_size` by more + // than its current value must saturate at 0. `Ledger::decrement_total_buffer_size` + // instead uses an unsaturated `fetch_sub`, so the in-memory atomic wraps + // toward 2^64 — after which `is_buffer_full()` returns true forever and the + // writer deadlocks permanently. PR #23561 only fixed the metrics reporter, + // not this control-path atomic. + // + // This test FAILS against current Vector (the failure IS the bug): in release + // the atomic wraps so the saturation assert fails; in a debug build the + // `prev - amount` in decrement_total_buffer_size's own `trace!` panics on the + // same underflow. It will pass once the decrement saturates. + with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + async move { + let (_writer, _reader, ledger) = + create_default_buffer_v2::<_, SizedRecord>(data_dir).await; + + ledger.increment_total_buffer_size(10); + assert_eq!(ledger.get_total_buffer_size(), 10); + + // Decrement by MORE than the current size. Correct behavior is to + // saturate at 0. + ledger.decrement_total_buffer_size(11); + + let after = ledger.get_total_buffer_size(); + assert_eq!( + after, 0, + "#21683: total_buffer_size decremented below zero must saturate at \ + 0, but was {after} (~2^64 wrap) — is_buffer_full() then never \ + returns false -> permanent writer deadlock" + ); + } + }) + .await; +} + +#[tokio::test] +async fn get_total_records_should_be_zero_on_drained_buffer_issue_21683_metrics() { + // Failing demonstration (sibling of #21683): `Ledger::get_total_records` + // computes `next_writer_id.wrapping_sub(last_reader_id) - 1`. + // + // CORRECT INVARIANT (asserted here): a fully drained buffer (writer and reader + // at the same record ID) holds 0 records. The trailing `- 1` underflows + // instead: in release it wraps to ~u64::MAX (which `synchronize_buffer_usage` + // then feeds into the buffer event-count metric on the next restart, reporting + // ~1.8e19 events for an empty buffer); in a debug build it panics on the same + // subtraction. Either way this test FAILS until the count saturates. + with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + async move { + let (_w, _r, ledger) = create_default_buffer_v2::<_, SizedRecord>(data_dir).await; + // Simulate a drained buffer: the reader has caught up to the writer. + unsafe { + ledger.state().unsafe_set_writer_next_record_id(42); + } + unsafe { + ledger.state().unsafe_set_reader_last_record_id(42); + } + let total = ledger.get_total_records(); + assert_eq!( + total, 0, + "get_total_records 0-1 underflow: a drained buffer (next==last) \ + must report 0 records, but yields {total} (~2^64); \ + synchronize_buffer_usage then reports ~1.8e19 buffer events on restart" + ); + } + }) + .await; +} + +#[tokio::test] +async fn writer_drop_without_flush_should_not_lose_buffered_events_issue_24948() { + // Failing demonstration of Vector #24948 (config-reload silent data loss). + // + // CORRECT INVARIANT (asserted here): events accepted by the writer must + // survive a writer teardown and be recoverable when the buffer is reopened. + // Records sit in the writer's in-memory TrackingBufWriter (256KB) until + // flush(); `BufferWriter::Drop` calls close() (mark_writer_done + notify) but + // NOT flush(). During a config reload the old writer is dropped while events + // are still buffered, so they never reach the data file and the ledger's + // writer_next_record is never advanced. This test FAILS (reopened buffer has + // 0 records, not 3) — that loss is the bug. + with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + async move { + { + let (mut writer, _reader, _ledger) = + create_default_buffer_v2::<_, SizedRecord>(data_dir.clone()).await; + for _ in 0..3u8 { + writer + .write_record(SizedRecord::new(64)) + .await + .expect("write should not fail"); + } + // No flush(): Drop simulates config-reload teardown (close, not flush). + drop(writer); + } + // Let the finalizer task observe the dropped reader and release the lock. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // Reopen the same buffer directory. + let (_writer2, _reader2, ledger2) = + create_default_buffer_v2::<_, SizedRecord>(data_dir).await; + + // #24948: the 3 events written before the un-flushed Drop must + // survive. They don't — the reopened buffer is empty (0 records), so + // this assertion FAILS, demonstrating the silent data loss. + assert_buffer_records!(ledger2, 3); + } + }) + .await; +} + +#[tokio::test] +async fn file_id_rollover_compare_should_be_wrap_aware_reader_932() { + // Failing demonstration: seek_to_next_record decides the reader is + // synchronized with the writer using a raw `reader_file_id > writer_file_id` + // (reader.rs ~932), which is NOT wrap-aware. + // + // CORRECT INVARIANT (asserted here): after a file-ID rollover where the writer + // has done MORE rotations than the reader, the comparison must NOT report the + // reader as having advanced past the writer. The writer wraps to a value + // SMALLER than the (behind) reader's ID, so the raw `>` wrongly concludes the + // reader is ahead and breaks the seek early. This test FAILS (raw compare says + // reader > writer) until the comparison becomes wrap-aware. + with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + async move { + let (_w, _r, ledger) = create_default_buffer_v2::<_, SizedRecord>(data_dir).await; + let n = u32::from(MAX_FILE_ID); // 6 in test builds; IDs cycle 0..n-1 + // Writer does n+1 rotations -> wraps past the end back to file ID 1. + for _ in 0..(n + 1) { + ledger.state().increment_writer_file_id(); + } + // Reader does n-1 rotations (unacked) -> file ID n-1 (still pre-wrap). + for _ in 0..(n - 1) { + ledger.increment_unacked_reader_file_id(); + } + let (reader_file_id, writer_file_id) = ledger.get_current_reader_writer_file_id(); + assert_eq!( + (reader_file_id, writer_file_id), + (MAX_FILE_ID - 1, 1), + "constructed post-rollover state: writer lapped to ID 1, reader behind at ID {}", + MAX_FILE_ID - 1 + ); + // The writer performed MORE rotations (n+1) than the reader (n-1), so it + // is genuinely AHEAD. A wrap-aware comparison must therefore NOT report + // the reader as past the writer: + assert!( + !(reader_file_id > writer_file_id), + "reader.rs:932 must use a wrap-aware comparison: after rollover the \ + writer lapped to ID {writer_file_id} and is AHEAD of the reader at \ + ID {reader_file_id}, so the reader is NOT past the writer. The raw \ + `reader_file_id > writer_file_id` ({reader_file_id} > {writer_file_id}) \ + is true, so seek_to_next_record wrongly concludes the reader advanced \ + past the writer and stops early." + ); + } + }) + .await; +} + +#[tokio::test] +async fn delete_completed_data_file_size_delta_should_saturate_reader_524() { + // Failing demonstration of the reader.rs:524 unguarded subtraction. + // + // `delete_completed_data_file` computes `size_delta = metadata.len() - bytes_read` + // (reader.rs ~524) and then `decrement_total_buffer_size(size_delta)`. + // + // CORRECT INVARIANT (asserted here): when a data file is truncated externally + // (crash / filesystem fault) so `bytes_read > metadata.len()`, the size-delta + // must saturate at 0 rather than underflow. The raw subtraction underflows: in + // release it wraps to ~2^64, which is fed straight into the unsaturated + // `decrement_total_buffer_size` -> the #21683 total_buffer_size wrap -> + // permanent writer deadlock; in a debug build the subtraction panics. Either + // way this test FAILS, demonstrating the bug. + // + // Here we reproduce the exact computation on a real file: write 100 bytes, + // record bytes_read=100, truncate the file to 40, then `metadata.len() - bytes_read`. + with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + async move { + let path = data_dir.join("buffer-data-0.dat"); + { + let mut f = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .open(&path) + .await + .expect("create should not fail"); + f.write_all(&[0u8; 100]) + .await + .expect("write should not fail"); + f.flush().await.expect("flush should not fail"); + } + // The reader accounted 100 bytes read from this data file. + let bytes_read: u64 = 100; + // External truncation (crash / FS fault) shrinks the file below bytes_read. + tokio::fs::OpenOptions::new() + .write(true) + .open(&path) + .await + .expect("open should not fail") + .set_len(40) + .await + .expect("truncate should not fail"); + + // Exactly what reader.rs:524 does: `metadata.len() - bytes_read`. + let metadata_len = tokio::fs::metadata(&path) + .await + .expect("metadata should not fail") + .len(); + assert_eq!(metadata_len, 40); + // In a debug build this raw subtraction panics (the bug manifesting); + // in release it wraps to ~2^64. + let size_delta = metadata_len - bytes_read; + + assert_eq!( + size_delta, 0, + "reader.rs:524: metadata.len()({metadata_len}) - bytes_read({bytes_read}) \ + on a truncated file must saturate at 0, but underflowed to {size_delta} \ + (~2^64); this is fed into the unsaturated decrement_total_buffer_size -> \ + the #21683 wrap -> permanent writer deadlock" + ); + } + }) + .await; +}