Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions lib/vector-buffers/src/buffer_usage_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,4 +505,68 @@ mod tests {
assert_eq!(current.event_count, 10);
assert_eq!(current.event_byte_size, 1000);
}

/// Demonstration of Vector issue #24606.
///
/// When a disk buffer drops events because of `when_full = drop_newest`, the
/// buffer-level `buffer_discarded_events_total` counter is incremented, but
/// the component-level `component_discarded_events_total` (the metric
/// operators monitor for data loss) is NEVER emitted by the buffer drop
/// path — so the loss is silent on standard dashboards.
///
/// This exercises the exact reporter code path that runs for real
/// drop_newest drops: `BufferUsageData::report` -> `emit(BufferEventsDropped
/// { intentional: true, reason: "drop_newest", .. })`. A local metrics
/// recorder captures everything emitted.
///
/// CORRECT INVARIANT (asserted here): dropped events must also be counted at
/// the component level so operators see the loss. This test currently FAILS
/// against Vector because #24606 is unfixed — `component_discarded_events_total`
/// stays at 0 for disk-buffer drop_newest drops. The failure IS the bug
/// demonstration; it will pass once #24606 is fixed.
#[test]
fn drop_newest_drops_should_increment_component_discarded_metric_issue_24606() {
use metrics_util::debugging::{DebugValue, DebuggingRecorder};

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let data = BufferUsageData::new(0);
let mut metrics = ReporterCurrentMetrics::default();
// 5 events dropped by `when_full = drop_newest` (intentional drops),
// exactly as `BufferSender` records them on the disk-buffer drop path.
data.dropped_intentional.increment(5, 500);
data.report(&mut metrics, "demo_disk_buffer");
});

let mut buffer_discarded = 0u64;
let mut component_discarded = 0u64;
for (ckey, _unit, _desc, value) in snapshotter.snapshot().into_vec() {
let name = ckey.key().name().to_string();
if let DebugValue::Counter(c) = value {
if name == "buffer_discarded_events_total" {
buffer_discarded += c;
} else if name == "component_discarded_events_total" {
component_discarded += c;
}
}
}

// The buffer accounted for the drops at the buffer level:
assert_eq!(
buffer_discarded, 5,
"buffer_discarded_events_total should reflect the 5 drop_newest drops"
);
// CORRECT INVARIANT: the component-level counter operators watch for
// data loss MUST also reflect the 5 drops. This currently FAILS (actual
// 0) — that failure is the #24606 demonstration: disk-buffer drop_newest
// drops are invisible on standard dashboards.
assert_eq!(
component_discarded, 5,
"#24606: component_discarded_events_total must reflect drop_newest \
drops (got {component_discarded}; the buffer never emits the \
component-level metric, so the loss is silent on dashboards)"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,53 @@ async fn ack_wakes_reader() {
})
.await;
}

#[tokio::test]
async fn rejected_delivery_should_not_advance_acks_finalizer_status_discard() {
// Failing demonstration: `spawn_finalizer` does
// `while let Some((_status, amount)) = stream.next().await { increment_pending_acks(amount) }`
// — the BatchStatus is discarded.
//
// CORRECT INVARIANT (asserted here): a REJECTED delivery (the sink gave up /
// could not deliver) must NOT advance the buffer's pending acks — the events
// were not acknowledged and must be retained. Because the finalizer ignores
// BatchStatus, a rejection still advances acks, so the buffer forgets the
// events as if delivered. This test FAILS (acks advance by 7, not 0),
// demonstrating the silent within-process data loss.
with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
let usage_handle = BufferUsageHandle::noop();
let config = DiskBufferConfigBuilder::from_path(data_dir)
.build()
.expect("creating buffer should not fail");
let ledger = Ledger::load_or_create(config, usage_handle)
.await
.expect("ledger should not fail to load/create");
let ledger = Arc::new(ledger);
let finalizer = Arc::clone(&ledger).spawn_finalizer();
assert_eq!(ledger.consume_pending_acks(), 0);

// A batch of 7 events whose delivery is REJECTED (not Delivered).
let (batch, receiver) = BatchNotifier::new_with_receiver();
finalizer.add(7, receiver);
let efin = EventFinalizer::new(batch);
efin.update_status(EventStatus::Rejected);
drop(efin); // sends the Rejected status update
tokio::task::yield_now().await;

// Finalizer BatchStatus discard: a rejected (failed) delivery must
// leave pending acks at 0. It instead advances by the full count, so
// the buffer forgets the events as if delivered — this assertion FAILS.
assert_eq!(
ledger.consume_pending_acks(),
0,
"rejected delivery must not advance acks, but advanced by 7 — the \
finalizer ignores BatchStatus, so failed deliveries are silently \
treated as acknowledged"
);
}
})
.await;
}
233 changes: 233 additions & 0 deletions lib/vector-buffers/src/variants/disk_v2/tests/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,3 +927,236 @@ async fn reader_writer_positions_aligned_through_multiple_files_and_records() {
let parent = trace_span!("reader_writer_positions_aligned_through_multiple_files_and_records");
fut.instrument(parent.or_current()).await;
}

#[tokio::test]
async fn ledger_total_buffer_size_decrement_should_saturate_not_underflow_issue_21683() {
// Failing demonstration of Vector #21683.
//
// CORRECT INVARIANT (asserted here): decrementing `total_buffer_size` by more
// than its current value must saturate at 0. `Ledger::decrement_total_buffer_size`
// instead uses an unsaturated `fetch_sub`, so the in-memory atomic wraps
// toward 2^64 — after which `is_buffer_full()` returns true forever and the
// writer deadlocks permanently. PR #23561 only fixed the metrics reporter,
// not this control-path atomic.
//
// This test FAILS against current Vector (the failure IS the bug): in release
// the atomic wraps so the saturation assert fails; in a debug build the
// `prev - amount` in decrement_total_buffer_size's own `trace!` panics on the
// same underflow. It will pass once the decrement saturates.
with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();
async move {
let (_writer, _reader, ledger) =
create_default_buffer_v2::<_, SizedRecord>(data_dir).await;

ledger.increment_total_buffer_size(10);
assert_eq!(ledger.get_total_buffer_size(), 10);

// Decrement by MORE than the current size. Correct behavior is to
// saturate at 0.
ledger.decrement_total_buffer_size(11);

let after = ledger.get_total_buffer_size();
assert_eq!(
after, 0,
"#21683: total_buffer_size decremented below zero must saturate at \
0, but was {after} (~2^64 wrap) — is_buffer_full() then never \
returns false -> permanent writer deadlock"
);
}
})
.await;
}

#[tokio::test]
async fn get_total_records_should_be_zero_on_drained_buffer_issue_21683_metrics() {
// Failing demonstration (sibling of #21683): `Ledger::get_total_records`
// computes `next_writer_id.wrapping_sub(last_reader_id) - 1`.
//
// CORRECT INVARIANT (asserted here): a fully drained buffer (writer and reader
// at the same record ID) holds 0 records. The trailing `- 1` underflows
// instead: in release it wraps to ~u64::MAX (which `synchronize_buffer_usage`
// then feeds into the buffer event-count metric on the next restart, reporting
// ~1.8e19 events for an empty buffer); in a debug build it panics on the same
// subtraction. Either way this test FAILS until the count saturates.
with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();
async move {
let (_w, _r, ledger) = create_default_buffer_v2::<_, SizedRecord>(data_dir).await;
// Simulate a drained buffer: the reader has caught up to the writer.
unsafe {
ledger.state().unsafe_set_writer_next_record_id(42);
}
unsafe {
ledger.state().unsafe_set_reader_last_record_id(42);
}
let total = ledger.get_total_records();
assert_eq!(
total, 0,
"get_total_records 0-1 underflow: a drained buffer (next==last) \
must report 0 records, but yields {total} (~2^64); \
synchronize_buffer_usage then reports ~1.8e19 buffer events on restart"
);
}
})
.await;
}

#[tokio::test]
async fn writer_drop_without_flush_should_not_lose_buffered_events_issue_24948() {
// Failing demonstration of Vector #24948 (config-reload silent data loss).
//
// CORRECT INVARIANT (asserted here): events accepted by the writer must
// survive a writer teardown and be recoverable when the buffer is reopened.
// Records sit in the writer's in-memory TrackingBufWriter (256KB) until
// flush(); `BufferWriter::Drop` calls close() (mark_writer_done + notify) but
// NOT flush(). During a config reload the old writer is dropped while events
// are still buffered, so they never reach the data file and the ledger's
// writer_next_record is never advanced. This test FAILS (reopened buffer has
// 0 records, not 3) — that loss is the bug.
with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();
async move {
{
let (mut writer, _reader, _ledger) =
create_default_buffer_v2::<_, SizedRecord>(data_dir.clone()).await;
for _ in 0..3u8 {
writer
.write_record(SizedRecord::new(64))
.await
.expect("write should not fail");
}
// No flush(): Drop simulates config-reload teardown (close, not flush).
drop(writer);
}
// Let the finalizer task observe the dropped reader and release the lock.
tokio::time::sleep(std::time::Duration::from_millis(500)).await;

// Reopen the same buffer directory.
let (_writer2, _reader2, ledger2) =
create_default_buffer_v2::<_, SizedRecord>(data_dir).await;

// #24948: the 3 events written before the un-flushed Drop must
// survive. They don't — the reopened buffer is empty (0 records), so
// this assertion FAILS, demonstrating the silent data loss.
assert_buffer_records!(ledger2, 3);
}
})
.await;
}

#[tokio::test]
async fn file_id_rollover_compare_should_be_wrap_aware_reader_932() {
// Failing demonstration: seek_to_next_record decides the reader is
// synchronized with the writer using a raw `reader_file_id > writer_file_id`
// (reader.rs ~932), which is NOT wrap-aware.
//
// CORRECT INVARIANT (asserted here): after a file-ID rollover where the writer
// has done MORE rotations than the reader, the comparison must NOT report the
// reader as having advanced past the writer. The writer wraps to a value
// SMALLER than the (behind) reader's ID, so the raw `>` wrongly concludes the
// reader is ahead and breaks the seek early. This test FAILS (raw compare says
// reader > writer) until the comparison becomes wrap-aware.
with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();
async move {
let (_w, _r, ledger) = create_default_buffer_v2::<_, SizedRecord>(data_dir).await;
let n = u32::from(MAX_FILE_ID); // 6 in test builds; IDs cycle 0..n-1
// Writer does n+1 rotations -> wraps past the end back to file ID 1.
for _ in 0..(n + 1) {
ledger.state().increment_writer_file_id();
}
// Reader does n-1 rotations (unacked) -> file ID n-1 (still pre-wrap).
for _ in 0..(n - 1) {
ledger.increment_unacked_reader_file_id();
}
let (reader_file_id, writer_file_id) = ledger.get_current_reader_writer_file_id();
assert_eq!(
(reader_file_id, writer_file_id),
(MAX_FILE_ID - 1, 1),
"constructed post-rollover state: writer lapped to ID 1, reader behind at ID {}",
MAX_FILE_ID - 1
);
// The writer performed MORE rotations (n+1) than the reader (n-1), so it
// is genuinely AHEAD. A wrap-aware comparison must therefore NOT report
// the reader as past the writer:
assert!(
!(reader_file_id > writer_file_id),
"reader.rs:932 must use a wrap-aware comparison: after rollover the \
writer lapped to ID {writer_file_id} and is AHEAD of the reader at \
ID {reader_file_id}, so the reader is NOT past the writer. The raw \
`reader_file_id > writer_file_id` ({reader_file_id} > {writer_file_id}) \
is true, so seek_to_next_record wrongly concludes the reader advanced \
past the writer and stops early."
);
}
})
.await;
}

#[tokio::test]
async fn delete_completed_data_file_size_delta_should_saturate_reader_524() {
// Failing demonstration of the reader.rs:524 unguarded subtraction.
//
// `delete_completed_data_file` computes `size_delta = metadata.len() - bytes_read`
// (reader.rs ~524) and then `decrement_total_buffer_size(size_delta)`.
//
// CORRECT INVARIANT (asserted here): when a data file is truncated externally
// (crash / filesystem fault) so `bytes_read > metadata.len()`, the size-delta
// must saturate at 0 rather than underflow. The raw subtraction underflows: in
// release it wraps to ~2^64, which is fed straight into the unsaturated
// `decrement_total_buffer_size` -> the #21683 total_buffer_size wrap ->
// permanent writer deadlock; in a debug build the subtraction panics. Either
// way this test FAILS, demonstrating the bug.
//
// Here we reproduce the exact computation on a real file: write 100 bytes,
// record bytes_read=100, truncate the file to 40, then `metadata.len() - bytes_read`.
with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();
async move {
let path = data_dir.join("buffer-data-0.dat");
{
let mut f = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.open(&path)
.await
.expect("create should not fail");
f.write_all(&[0u8; 100])
.await
.expect("write should not fail");
f.flush().await.expect("flush should not fail");
}
// The reader accounted 100 bytes read from this data file.
let bytes_read: u64 = 100;
// External truncation (crash / FS fault) shrinks the file below bytes_read.
tokio::fs::OpenOptions::new()
.write(true)
.open(&path)
.await
.expect("open should not fail")
.set_len(40)
.await
.expect("truncate should not fail");

// Exactly what reader.rs:524 does: `metadata.len() - bytes_read`.
let metadata_len = tokio::fs::metadata(&path)
.await
.expect("metadata should not fail")
.len();
assert_eq!(metadata_len, 40);
// In a debug build this raw subtraction panics (the bug manifesting);
// in release it wraps to ~2^64.
let size_delta = metadata_len - bytes_read;

assert_eq!(
size_delta, 0,
"reader.rs:524: metadata.len()({metadata_len}) - bytes_read({bytes_read}) \
on a truncated file must saturate at 0, but underflowed to {size_delta} \
(~2^64); this is fed into the unsaturated decrement_total_buffer_size -> \
the #21683 wrap -> permanent writer deadlock"
);
}
})
.await;
}
Loading