From a48295cf247845d3c5d457f8be42560147c8a2ee Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 29 May 2026 17:18:35 +0000 Subject: [PATCH] chore(tests): disk_v2 antithesis test harness + exercisers Self-driving exercisers (disk_v2_antithesis: #21683 accounting cluster; disk_v2_lossfinder: 7-scenario data-loss fault menu with a no-silent-loss oracle) plus the full Antithesis harness: Dockerfiles, docker-compose configs (block / drop variants / direct / lossfinder), the Vector-SUT workload, and the test templates under tests/antithesis/test/v1. Registers the examples + serde_json dev-dep. --- lib/vector-buffers/Cargo.toml | 11 +- .../examples/disk_v2_antithesis.rs | 361 +++++++++ .../examples/disk_v2_lossfinder.rs | 734 ++++++++++++++++++ tests/antithesis/AGENTS.md | 56 ++ tests/antithesis/Dockerfile | 97 +++ tests/antithesis/Dockerfile.direct | 66 ++ tests/antithesis/Dockerfile.lossfinder | 64 ++ .../config-direct/docker-compose.yaml | 39 + .../config-drop/docker-compose.yaml | 49 ++ .../config-dropfail/docker-compose.yaml | 52 ++ .../config-dropfull/docker-compose.yaml | 50 ++ .../config-lossfinder/docker-compose.yaml | 42 + tests/antithesis/config/docker-compose.yaml | 48 ++ tests/antithesis/config/vector.yaml | 52 ++ tests/antithesis/setup-complete.sh | 22 + .../eventually_durability_and_progress.sh | 6 + .../v1/diskbuf/parallel_driver_produce.sh | 5 + .../v1/diskbuf_direct/eventually_progress.sh | 18 + .../v1/diskbuf_direct/first_wait_ready.sh | 18 + .../parallel_driver_safety_monitor.sh | 29 + .../diskbuf_loss/eventually_no_silent_loss.sh | 24 + .../test/v1/diskbuf_loss/first_wait_ready.sh | 18 + .../parallel_driver_loss_monitor.sh | 24 + tests/antithesis/workload/.gitignore | 2 + tests/antithesis/workload/Cargo.toml | 29 + tests/antithesis/workload/src/main.rs | 439 +++++++++++ 26 files changed, 2353 insertions(+), 2 deletions(-) create mode 100644 lib/vector-buffers/examples/disk_v2_antithesis.rs create mode 100644 lib/vector-buffers/examples/disk_v2_lossfinder.rs create mode 100644 tests/antithesis/AGENTS.md create mode 100644 tests/antithesis/Dockerfile create mode 100644 tests/antithesis/Dockerfile.direct create mode 100644 tests/antithesis/Dockerfile.lossfinder create mode 100644 tests/antithesis/config-direct/docker-compose.yaml create mode 100644 tests/antithesis/config-drop/docker-compose.yaml create mode 100644 tests/antithesis/config-dropfail/docker-compose.yaml create mode 100644 tests/antithesis/config-dropfull/docker-compose.yaml create mode 100644 tests/antithesis/config-lossfinder/docker-compose.yaml create mode 100644 tests/antithesis/config/docker-compose.yaml create mode 100644 tests/antithesis/config/vector.yaml create mode 100755 tests/antithesis/setup-complete.sh create mode 100755 tests/antithesis/test/v1/diskbuf/eventually_durability_and_progress.sh create mode 100755 tests/antithesis/test/v1/diskbuf/parallel_driver_produce.sh create mode 100755 tests/antithesis/test/v1/diskbuf_direct/eventually_progress.sh create mode 100755 tests/antithesis/test/v1/diskbuf_direct/first_wait_ready.sh create mode 100755 tests/antithesis/test/v1/diskbuf_direct/parallel_driver_safety_monitor.sh create mode 100755 tests/antithesis/test/v1/diskbuf_loss/eventually_no_silent_loss.sh create mode 100755 tests/antithesis/test/v1/diskbuf_loss/first_wait_ready.sh create mode 100755 tests/antithesis/test/v1/diskbuf_loss/parallel_driver_loss_monitor.sh create mode 100644 tests/antithesis/workload/.gitignore create mode 100644 tests/antithesis/workload/Cargo.toml create mode 100644 tests/antithesis/workload/src/main.rs diff --git a/lib/vector-buffers/Cargo.toml b/lib/vector-buffers/Cargo.toml index 7770d9b8eb301..bcf2fc62be32e 100644 --- a/lib/vector-buffers/Cargo.toml +++ b/lib/vector-buffers/Cargo.toml @@ -34,8 +34,8 @@ vector-config = { path = "../vector-config", default-features = false } vector-common = { path = "../vector-common", default-features = false, features = ["byte_size_of"] } dashmap.workspace = true ordered-float.workspace = true -# Antithesis instrumentation (SUT-side): assert_* macros for the underflow -# guards below + the LLVM coverage runtime shim. No-ops outside Antithesis. +# Antithesis instrumentation (SUT-side): assert_* macros for property +# demonstration + the LLVM coverage runtime shim. No-ops outside Antithesis. antithesis_sdk = "0.2.8" antithesis-instrumentation = "0.1" @@ -49,6 +49,7 @@ metrics-util = { workspace = true, features = ["debugging"] } proptest.workspace = true quickcheck.workspace = true rand.workspace = true +serde_json.workspace = true serde_yaml.workspace = true temp-dir = "0.1.16" tokio-test.workspace = true @@ -61,3 +62,9 @@ harness = false [[example]] name = "buffer_perf" + +[[example]] +name = "disk_v2_antithesis" + +[[example]] +name = "disk_v2_lossfinder" diff --git a/lib/vector-buffers/examples/disk_v2_antithesis.rs b/lib/vector-buffers/examples/disk_v2_antithesis.rs new file mode 100644 index 0000000000000..a4a56ad984d5f --- /dev/null +++ b/lib/vector-buffers/examples/disk_v2_antithesis.rs @@ -0,0 +1,361 @@ +//! Self-driving Antithesis exerciser for the disk buffer v2. +//! +//! This binary IS the system under test: because a disk_v2 buffer takes an +//! advisory lock (one process per buffer directory), the workload that drives +//! the buffer must live in the same process that owns it. It opens a real +//! disk_v2 buffer through the public topology API (exactly as Vector's sink +//! layer does) and then runs randomized writer/reader activity forever, using +//! the Antithesis SDK's randomness so Antithesis can branch the search. +//! +//! The dangerous *internal* invariants (the `total_buffer_size` / +//! `get_total_records` / data-file size-delta underflows behind Vector #21683, +//! and the file-id rollover) are checked by surgical `assert_always!` calls +//! placed SUT-side inside `vector-buffers` itself — those fire no matter how +//! the buffer reaches the bad state. The assertions here are the workload-level +//! safety/liveness oracle: never deliver more than produced (at-most-once +//! sanity), the drained-buffer boundary is actually reached (reachability of +//! the #21683 precondition), and flushed records do get delivered (progress). +//! +//! All `antithesis_sdk` calls are no-ops outside the Antithesis environment, so +//! this binary also runs fine locally for smoke testing. + +use std::{ + error, fmt, + path::PathBuf, + sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, + }, + time::Duration, +}; + +use antithesis_sdk::{assert_always, assert_reachable, assert_sometimes, lifecycle, random}; +use bytes::{Buf, BufMut}; +use tokio::{task, time}; +use tracing::{Span, info}; +use tracing_subscriber::EnvFilter; +use vector_buffers::{ + BufferType, EventCount, WhenFull, + encoding::FixedEncodable, + topology::{ + builder::TopologyBuilder, + channel::{BufferReceiver, BufferSender}, + }, +}; +use vector_common::{ + byte_size_of::ByteSizeOf, + finalization::{ + AddBatchNotifier, BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, Finalizable, + }, +}; + +/// A uniquely-id'd, variable-size record. Identical in spirit to the one used by +/// `buffer_perf`, so it round-trips through the disk_v2 encoder/decoder unchanged. +#[derive(Clone, Debug)] +struct VariableMessage { + id: u64, + payload: Vec, + finalizers: EventFinalizers, +} + +impl VariableMessage { + fn new(id: u64, payload: Vec) -> Self { + VariableMessage { + id, + payload, + finalizers: EventFinalizers::default(), + } + } +} + +impl AddBatchNotifier for VariableMessage { + fn add_batch_notifier(&mut self, batch: BatchNotifier) { + self.finalizers.add(EventFinalizer::new(batch)); + } +} + +impl ByteSizeOf for VariableMessage { + fn allocated_bytes(&self) -> usize { + self.payload.len() + } +} + +impl EventCount for VariableMessage { + fn event_count(&self) -> usize { + 1 + } +} + +impl Finalizable for VariableMessage { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl FixedEncodable for VariableMessage { + type EncodeError = EncodeError; + type DecodeError = DecodeError; + + fn encode(self, buffer: &mut B) -> Result<(), Self::EncodeError> + where + B: BufMut, + Self: Sized, + { + buffer.put_u64(self.id); + buffer.put_u64(self.payload.len() as u64); + buffer.put_slice(&self.payload); + Ok(()) + } + + fn encoded_size(&self) -> Option { + Some(8 + 8 + self.payload.len()) + } + + fn decode(mut buffer: B) -> Result + where + B: Buf, + Self: Sized, + { + let id = buffer.get_u64(); + let payload_len = buffer.get_u64() as usize; + let payload = buffer.copy_to_bytes(payload_len).to_vec(); + Ok(VariableMessage::new(id, payload)) + } +} + +#[derive(Debug)] +struct EncodeError; +impl fmt::Display for EncodeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self:?}") + } +} +impl error::Error for EncodeError {} + +#[derive(Debug)] +struct DecodeError; +impl fmt::Display for DecodeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self:?}") + } +} +impl error::Error for DecodeError {} + +/// Shared progress counters between the writer, reader, and status reporter. +#[derive(Default)] +struct Progress { + produced: AtomicU64, + /// Records that were `flush`ed by the writer (durably committed) — the set + /// the at-least-once oracle holds the buffer accountable for. + produced_flushed: AtomicU64, + delivered: AtomicU64, + rejected: AtomicU64, + /// Set once the reader has observed the buffer fully drained at least once + /// (reader id caught up to the writer) — the #21683 underflow precondition. + drained_seen: AtomicBool, +} + +/// Draw a `u64` in `[lo, hi)` from the Antithesis-controlled RNG. +#[inline] +fn rand_in(lo: u64, hi: u64) -> u64 { + if hi <= lo { + return lo; + } + lo + (random::get_random() % (hi - lo)) +} + +/// Build a disk_v2 buffer through the public topology API, the same path the +/// Vector sink layer uses. +async fn build_buffer( + data_dir: PathBuf, + max_size: u64, +) -> ( + BufferSender, + BufferReceiver, +) { + let mut builder = TopologyBuilder::default(); + let variant = BufferType::DiskV2 { + max_size: std::num::NonZeroU64::new(max_size).expect("max_size must be non-zero"), + when_full: WhenFull::Block, + }; + variant + .add_to_builder(&mut builder, Some(data_dir), "vdbuf-antithesis".to_string()) + .expect("adding disk_v2 variant to builder should not fail"); + builder + .build(String::from("vdbuf-antithesis"), Span::none()) + .await + .expect("building the disk_v2 buffer should not fail") +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +async fn main() { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + antithesis_sdk::antithesis_init(); + + let data_dir = + PathBuf::from(std::env::var("VDBUF_DIR").unwrap_or_else(|_| "/var/lib/vdbuf".to_string())); + let status_path = PathBuf::from( + std::env::var("VDBUF_STATUS").unwrap_or_else(|_| "/tmp/vdbuf-status".to_string()), + ); + // Keep the buffer small so fill/drain cycles are cheap and the reader + // repeatedly catches up to the writer — the get_total_records underflow + // boundary. 256MB is the enforced minimum. + let max_size: u64 = std::env::var("VDBUF_MAX_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(268_435_488); + // Cap the payload well under the per-record/data-file limits. + let max_payload: u64 = std::env::var("VDBUF_MAX_PAYLOAD") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(4096); + + std::fs::create_dir_all(&data_dir).expect("creating buffer dir should not fail"); + + info!( + ?data_dir, + max_size, max_payload, "[vdbuf] opening disk_v2 buffer" + ); + let (mut writer, mut reader) = build_buffer(data_dir, max_size).await; + + let progress = Arc::new(Progress::default()); + + // ---- writer task --------------------------------------------------- + let wp = Arc::clone(&progress); + let writer_task = task::spawn(async move { + let mut next_id: u64 = 1; + let mut iters: u64 = 0; + loop { + iters += 1; + // Produce a small randomly-sized batch, then flush only some of the + // time. Small batches keep the writer from sprinting far ahead of the + // reader, so the two ride close together and the reader frequently + // catches up to the writer head — the get_total_records / #21683 + // underflow boundary that Antithesis's thread-pausing then races. + let batch = rand_in(1, 16); + for _ in 0..batch { + let payload_len = rand_in(0, max_payload + 1) as usize; + let msg = VariableMessage::new(next_id, vec![0xab; payload_len]); + next_id += 1; + if writer.send(msg, None).await.is_ok() { + wp.produced.fetch_add(1, Ordering::Relaxed); + } + } + + // Flush ~75% of the time; the rest of the time let records linger + // (exercising the not-yet-durable path). + if rand_in(0, 4) != 0 && writer.flush().await.is_ok() { + // Everything produced so far is now durably committed. + let produced = wp.produced.load(Ordering::Relaxed); + wp.produced_flushed.store(produced, Ordering::Relaxed); + } + + // Periodic "drain phase": idle long enough for the reader to fully + // catch up to the writer, parking the buffer right at the drained + // boundary where the accounting underflows manifest. + if iters % 16 == 0 { + let _ = writer.flush().await; + wp.produced_flushed + .store(wp.produced.load(Ordering::Relaxed), Ordering::Relaxed); + time::sleep(Duration::from_millis(rand_in(100, 400))).await; + } else if rand_in(0, 4) == 0 { + time::sleep(Duration::from_millis(rand_in(2, 20))).await; + } + } + }); + + // ---- reader task --------------------------------------------------- + let rp = Arc::clone(&progress); + let reader_task = task::spawn(async move { + let mut setup_done = false; + loop { + // Read aggressively so the reader stays close behind the writer. + // Only rarely pause (letting the buffer grow toward full). + if rand_in(0, 20) == 0 { + time::sleep(Duration::from_millis(rand_in(5, 80))).await; + } + + match reader.next().await { + Some(mut record) => { + let finalizers = record.take_finalizers(); + // Mostly acknowledge delivery; occasionally Reject to + // exercise the finalizer status path. Either way the read + // advances the reader, driving it toward the writer. + if rand_in(0, 16) == 0 { + finalizers.update_status(EventStatus::Rejected); + rp.rejected.fetch_add(1, Ordering::Relaxed); + } else { + finalizers.update_status(EventStatus::Delivered); + rp.delivered.fetch_add(1, Ordering::Relaxed); + } + drop(record); + + if !setup_done { + // First successful end-to-end round-trip through the + // disk buffer: the harness is live. + assert_reachable!("record delivered end-to-end through disk_v2 buffer"); + lifecycle::setup_complete(&serde_json::json!({"stage": "first_delivery"})); + setup_done = true; + } + } + None => { + // Buffer reports end-of-stream: reader has caught up to the + // writer — the drained boundary that drives get_total_records + // toward its `0 - 1` underflow. + rp.drained_seen.store(true, Ordering::Relaxed); + assert_reachable!("disk_v2 buffer fully drained (reader caught up to writer)"); + time::sleep(Duration::from_millis(rand_in(1, 20))).await; + } + } + } + }); + + // ---- status + oracle reporter ------------------------------------- + let mut tick = time::interval(Duration::from_millis(500)); + loop { + tick.tick().await; + let produced = progress.produced.load(Ordering::Relaxed); + let flushed = progress.produced_flushed.load(Ordering::Relaxed); + let delivered = progress.delivered.load(Ordering::Relaxed); + let rejected = progress.rejected.load(Ordering::Relaxed); + let drained = progress.drained_seen.load(Ordering::Relaxed); + let handled = delivered + rejected; + + // Safety: the buffer can never hand the reader more records than were + // ever produced. A violation means duplicated/phantom records — exactly + // what a get_total_records / accounting underflow would manifest as. + assert_always!( + handled <= produced, + "disk_v2 never delivers more records than were produced" + ); + + // Liveness: once we have flushed records, the reader keeps making + // progress toward delivering them. + if flushed > 0 { + assert_sometimes!( + delivered > 0, + "flushed records are eventually delivered (writer/reader make progress)" + ); + } + + // Reachability of the dangerous precondition. + if drained { + assert_reachable!("reached drained-buffer state at least once"); + } + + let _ = std::fs::write( + &status_path, + format!( + "produced={produced} flushed={flushed} delivered={delivered} \ + rejected={rejected} handled={handled} drained={drained}\n" + ), + ); + + if writer_task.is_finished() || reader_task.is_finished() { + info!("[vdbuf] a worker task ended; stopping"); + break; + } + } +} diff --git a/lib/vector-buffers/examples/disk_v2_lossfinder.rs b/lib/vector-buffers/examples/disk_v2_lossfinder.rs new file mode 100644 index 0000000000000..722647377be09 --- /dev/null +++ b/lib/vector-buffers/examples/disk_v2_lossfinder.rs @@ -0,0 +1,734 @@ +//! Self-driving Antithesis **data-loss finder** for the disk buffer v2. +//! +//! Where `disk_v2_antithesis` checks the internal accounting invariants +//! (#21683-class underflows), this binary is a phased scenario loop whose sole +//! job is to detect *silent data loss*: a record that the buffer accepted and +//! durably flushed (an at-least-once promise) but then never hands back to the +//! reader and never explicitly refuses. +//! +//! It is a single-process, single-task harness so that record ids are strictly +//! monotonic (1, 2, 3, ...) and we can run a precise oracle: +//! +//! * `outstanding` — ids produced AND flushed but not yet resolved. +//! * `allowed_loss` — ids whose loss is permitted (unflushed at crash/reopen). +//! +//! On each round we pick one of seven scenarios, run an ACTIVE phase that +//! injects the scenario's fault, a QUIESCE phase that drains the reader, and a +//! CHECK phase that asserts `outstanding - allowed_loss` is empty. A non-empty +//! leftover is silent data loss and (per owner ruling) a BUG, so the +//! `assert_always!` calls are *meant* to fire if the buffer loses data. +//! +//! All `antithesis_sdk` calls are no-ops outside the Antithesis environment, so +//! this binary also runs fine locally for smoke testing. + +use std::{ + collections::BTreeSet, + error, fmt, + fs::OpenOptions, + io::{Seek, SeekFrom, Write as _}, + path::{Path, PathBuf}, + time::Duration, +}; + +use antithesis_sdk::{assert_always, assert_reachable, lifecycle, random}; +use bytes::{Buf, BufMut}; +use tokio::time; +use tracing::{Span, info, warn}; +use tracing_subscriber::EnvFilter; +use vector_buffers::{ + BufferType, EventCount, WhenFull, + encoding::FixedEncodable, + topology::{ + builder::TopologyBuilder, + channel::{BufferReceiver, BufferSender}, + }, +}; +use vector_common::{ + byte_size_of::ByteSizeOf, + finalization::{ + AddBatchNotifier, BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, Finalizable, + }, +}; + +/// A uniquely-id'd, variable-size record. Identical in spirit to the one used by +/// `buffer_perf` / `disk_v2_antithesis`, so it round-trips through the disk_v2 +/// encoder/decoder unchanged. +#[derive(Clone, Debug)] +struct VariableMessage { + id: u64, + payload: Vec, + finalizers: EventFinalizers, +} + +impl VariableMessage { + fn new(id: u64, payload: Vec) -> Self { + VariableMessage { + id, + payload, + finalizers: EventFinalizers::default(), + } + } +} + +impl AddBatchNotifier for VariableMessage { + fn add_batch_notifier(&mut self, batch: BatchNotifier) { + self.finalizers.add(EventFinalizer::new(batch)); + } +} + +impl ByteSizeOf for VariableMessage { + fn allocated_bytes(&self) -> usize { + self.payload.len() + } +} + +impl EventCount for VariableMessage { + fn event_count(&self) -> usize { + 1 + } +} + +impl Finalizable for VariableMessage { + fn take_finalizers(&mut self) -> EventFinalizers { + std::mem::take(&mut self.finalizers) + } +} + +impl FixedEncodable for VariableMessage { + type EncodeError = EncodeError; + type DecodeError = DecodeError; + + fn encode(self, buffer: &mut B) -> Result<(), Self::EncodeError> + where + B: BufMut, + Self: Sized, + { + buffer.put_u64(self.id); + buffer.put_u64(self.payload.len() as u64); + buffer.put_slice(&self.payload); + Ok(()) + } + + fn encoded_size(&self) -> Option { + Some(8 + 8 + self.payload.len()) + } + + fn decode(mut buffer: B) -> Result + where + B: Buf, + Self: Sized, + { + let id = buffer.get_u64(); + let payload_len = buffer.get_u64() as usize; + let payload = buffer.copy_to_bytes(payload_len).to_vec(); + Ok(VariableMessage::new(id, payload)) + } +} + +#[derive(Debug)] +struct EncodeError; +impl fmt::Display for EncodeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self:?}") + } +} +impl error::Error for EncodeError {} + +#[derive(Debug)] +struct DecodeError; +impl fmt::Display for DecodeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self:?}") + } +} +impl error::Error for DecodeError {} + +/// Draw a `u64` in `[lo, hi)` from the Antithesis-controlled RNG. +#[inline] +fn rand_in(lo: u64, hi: u64) -> u64 { + if hi <= lo { + return lo; + } + lo + (random::get_random() % (hi - lo)) +} + +/// Build a disk_v2 buffer through the public topology API, the same path the +/// Vector sink layer uses. `when_full` and `max_size` are scenario-controlled. +async fn build_buffer( + data_dir: PathBuf, + max_size: u64, + when_full: WhenFull, +) -> ( + BufferSender, + BufferReceiver, +) { + let mut builder = TopologyBuilder::default(); + let variant = BufferType::DiskV2 { + max_size: std::num::NonZeroU64::new(max_size).expect("max_size must be non-zero"), + when_full, + }; + variant + .add_to_builder(&mut builder, Some(data_dir), "vdbuf-lossfinder".to_string()) + .expect("adding disk_v2 variant to builder should not fail"); + builder + .build(String::from("vdbuf-lossfinder"), Span::none()) + .await + .expect("building the disk_v2 buffer should not fail") +} + +/// Recursively find any existing `*.dat` data file under `dir`. +fn find_dat_files(dir: &Path, out: &mut Vec) { + let Ok(entries) = std::fs::read_dir(dir) else { + return; + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + find_dat_files(&path, out); + } else if path.extension().and_then(|e| e.to_str()) == Some("dat") { + out.push(path); + } + } +} + +/// Scenario tags. `scenario = get_random() % 7`. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum Scenario { + Baseline, + WriterDropNoFlush, + RejectDeliveries, + CrashReopen, + DropNewestOverfill, + Corruption, + TruncateTail, +} + +impl Scenario { + fn from_u64(n: u64) -> Self { + match n % 7 { + 0 => Scenario::Baseline, + 1 => Scenario::WriterDropNoFlush, + 2 => Scenario::RejectDeliveries, + 3 => Scenario::CrashReopen, + 4 => Scenario::DropNewestOverfill, + 5 => Scenario::Corruption, + _ => Scenario::TruncateTail, + } + } +} + +/// Running oracle / counters shared across rounds. +#[derive(Default)] +struct Oracle { + /// ids produced AND flushed but not yet resolved (delivered/dropped). + outstanding: BTreeSet, + /// ids whose loss is permitted (unflushed at writer-drop / crash). + allowed_loss: BTreeSet, + produced: u64, + produced_flushed: u64, + delivered: u64, + rejected: u64, + /// ids the buffer refused at send-time (drop_newest accounting). + dropped_counted: u64, + silent_loss_detected: u64, + /// Largest id ever delivered, for the monotonic sanity assertion. + max_delivered_id: u64, +} + +/// State for a single buffer instance (sender/receiver + its config). +struct BufInstance { + writer: BufferSender, + reader: BufferReceiver, +} + +/// Read a handful of records and ack them. In `RejectDeliveries`, ack a fraction +/// as Rejected and intentionally KEEP them in `outstanding` — a rejected event +/// must be retained/redelivered, not silently freed. Returns whether a delivery +/// happened (for first-delivery lifecycle signalling). +async fn read_some( + buf: &mut BufInstance, + oracle: &mut Oracle, + count: u64, + reject_fraction: bool, +) -> bool { + let mut any_delivered = false; + for _ in 0..count { + match buf.reader.next().await { + Some(mut record) => { + let id = record.id; + let finalizers = record.take_finalizers(); + if reject_fraction && rand_in(0, 3) == 0 { + // Rejected: the buffer must NOT consider this resolved. + finalizers.update_status(EventStatus::Rejected); + oracle.rejected += 1; + // Deliberately do NOT remove `id` from `outstanding`. + } else { + finalizers.update_status(EventStatus::Delivered); + oracle.delivered += 1; + oracle.max_delivered_id = oracle.max_delivered_id.max(id); + oracle.outstanding.remove(&id); + any_delivered = true; + } + drop(record); + } + None => break, + } + } + any_delivered +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +async fn main() { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + antithesis_sdk::antithesis_init(); + + let base_dir = + PathBuf::from(std::env::var("VDBUF_DIR").unwrap_or_else(|_| "/var/lib/vdbuf".to_string())); + let status_path = PathBuf::from( + std::env::var("VDBUF_STATUS").unwrap_or_else(|_| "/tmp/vdbuf-status".to_string()), + ); + let max_size: u64 = std::env::var("VDBUF_MAX_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(268_435_488); + let max_payload: u64 = std::env::var("VDBUF_MAX_PAYLOAD") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(4096); + + std::fs::create_dir_all(&base_dir).expect("creating base buffer dir should not fail"); + + let mut oracle = Oracle::default(); + let mut next_id: u64 = 1; + let mut setup_done = false; + let mut round: u64 = 0; + // Rotate the on-disk subdir periodically to avoid unbounded disk growth. + let mut subdir_seq: u64 = 0; + + let mut data_dir = base_dir.join(format!("run-{subdir_seq}")); + std::fs::create_dir_all(&data_dir).expect("creating buffer subdir should not fail"); + info!( + ?data_dir, + max_size, max_payload, "[lossfinder] opening disk_v2 buffer" + ); + let (writer, reader) = build_buffer(data_dir.clone(), max_size, WhenFull::Block).await; + let mut buf = BufInstance { writer, reader }; + + loop { + round += 1; + let scenario = Scenario::from_u64(random::get_random()); + + // ----- scenario coverage markers ---------------------------------- + match scenario { + Scenario::Baseline => assert_reachable!("scenario: Baseline entered"), + Scenario::WriterDropNoFlush => { + assert_reachable!("scenario: WriterDropNoFlush entered") + } + Scenario::RejectDeliveries => { + assert_reachable!("scenario: RejectDeliveries entered") + } + Scenario::CrashReopen => assert_reachable!("scenario: CrashReopen entered"), + Scenario::DropNewestOverfill => { + assert_reachable!("scenario: DropNewestOverfill entered") + } + Scenario::Corruption => assert_reachable!("scenario: Corruption entered"), + Scenario::TruncateTail => assert_reachable!("scenario: TruncateTail entered"), + } + + // DropNewestOverfill needs its own buffer built with DropNewest and a + // smaller max_size; rebuild on a fresh subdir for this round only. + if scenario == Scenario::DropNewestOverfill { + subdir_seq += 1; + let dn_dir = base_dir.join(format!("run-{subdir_seq}")); + std::fs::create_dir_all(&dn_dir).expect("creating drop_newest subdir should not fail"); + drop(buf); + // Minimum enforced size is 256MB; use it as the "smaller" size. + let (w, r) = build_buffer(dn_dir.clone(), 268_435_488, WhenFull::DropNewest).await; + buf = BufInstance { + writer: w, + reader: r, + }; + data_dir = dn_dir; + // Reset the oracle's outstanding set: drop_newest is best-effort and + // we do not strict-track its ids (see SIMPLIFICATION below). + oracle.outstanding.clear(); + oracle.allowed_loss.clear(); + } + + let active_rounds = rand_in(20, 80); + // Pick a random point in the active phase to inject the fault. + let fault_at = rand_in(0, active_rounds.max(1)); + // Track ids produced+flushed strictly after a corruption/truncation, to + // verify the fault doesn't cause collateral loss of *later* records. + let mut post_fault_ids: BTreeSet = BTreeSet::new(); + let mut fault_applied = false; + // For drop_newest we treat everything best-effort and skip strict oracle. + let drop_newest = scenario == Scenario::DropNewestOverfill; + + // ===================== ACTIVE PHASE =============================== + for active in 0..active_rounds { + // Produce a batch of monotonic-id records. + let batch = rand_in(1, 16); + let mut newly_produced: Vec = Vec::new(); + for _ in 0..batch { + let payload_len = rand_in(0, max_payload + 1) as usize; + let msg = VariableMessage::new(next_id, vec![0xab; payload_len]); + let id = next_id; + next_id += 1; + match buf.writer.send(msg, None).await { + Ok(()) => { + oracle.produced += 1; + newly_produced.push(id); + } + Err(e) => { + // Under fault injection a send may fail; expected. + warn!(error = %e, "[lossfinder] send failed (expected under fault)"); + } + } + } + + // WriterDropNoFlush special-case: when we hit the fault point, + // produce several records WITHOUT flushing, then drop+reopen the + // writer. The unflushed ids are allowed loss. + if scenario == Scenario::WriterDropNoFlush && active == fault_at && !fault_applied { + fault_applied = true; + let mut unflushed: Vec = Vec::new(); + for _ in 0..rand_in(1, 12) { + let payload_len = rand_in(0, max_payload + 1) as usize; + let msg = VariableMessage::new(next_id, vec![0xab; payload_len]); + let id = next_id; + next_id += 1; + if buf.writer.send(msg, None).await.is_ok() { + oracle.produced += 1; + unflushed.push(id); + } + } + for id in &unflushed { + oracle.allowed_loss.insert(*id); + } + // Also any records produced this iteration but not yet flushed + // are at risk; conservatively allow their loss too. + for id in &newly_produced { + oracle.allowed_loss.insert(*id); + } + newly_produced.clear(); + info!("[lossfinder] WriterDropNoFlush: dropping+reopening writer"); + drop(buf); + let (w, r) = build_buffer(data_dir.clone(), max_size, WhenFull::Block).await; + buf = BufInstance { + writer: w, + reader: r, + }; + continue; + } + + // Flush ~75% of the time. On flush, the newly-produced ids become + // durable and the buffer is accountable for them. + let do_flush = rand_in(0, 4) != 0; + if do_flush { + match buf.writer.flush().await { + Ok(()) => { + if !drop_newest { + for id in &newly_produced { + oracle.outstanding.insert(*id); + // If a corruption/truncation already happened this + // round, these are post-fault and must survive. + if fault_applied + && matches!( + scenario, + Scenario::Corruption | Scenario::TruncateTail + ) + { + post_fault_ids.insert(*id); + } + } + } + oracle.produced_flushed += newly_produced.len() as u64; + } + Err(e) => { + warn!(error = %e, "[lossfinder] flush failed (expected under fault)"); + } + } + } + + // Read a few records and ack them. + let to_read = rand_in(0, 5); + let reject = scenario == Scenario::RejectDeliveries; + let delivered_now = read_some(&mut buf, &mut oracle, to_read, reject).await; + if delivered_now && !setup_done { + assert_reachable!("first record delivered end-to-end through disk_v2 lossfinder"); + lifecycle::setup_complete(&serde_json::json!({"stage": "first_delivery"})); + setup_done = true; + } + + // ----- inject the remaining faults at the fault point ---------- + if active == fault_at && !fault_applied { + match scenario { + Scenario::CrashReopen => { + fault_applied = true; + // Anything produced this iter but not flushed is at risk. + if !do_flush { + for id in &newly_produced { + oracle.allowed_loss.insert(*id); + } + } + info!("[lossfinder] CrashReopen: dropping sender+receiver and rebuilding"); + drop(buf); + let (w, r) = + build_buffer(data_dir.clone(), max_size, WhenFull::Block).await; + buf = BufInstance { + writer: w, + reader: r, + }; + } + Scenario::Corruption => { + fault_applied = true; + // Make sure prior records are durable before corrupting. + let _ = buf.writer.flush().await; + let mut dats = Vec::new(); + find_dat_files(&data_dir, &mut dats); + if let Some(target) = dats.first() { + if let Err(e) = corrupt_file(target) { + warn!(error = %e, "[lossfinder] corruption write failed"); + } else { + info!(?target, "[lossfinder] Corruption: flipped one byte"); + } + } else { + warn!("[lossfinder] Corruption: no .dat file found yet"); + } + } + Scenario::TruncateTail => { + fault_applied = true; + let _ = buf.writer.flush().await; + let mut dats = Vec::new(); + find_dat_files(&data_dir, &mut dats); + if let Some(target) = dats.first() { + if let Err(e) = truncate_file(target) { + warn!(error = %e, "[lossfinder] truncate failed"); + } else { + info!(?target, "[lossfinder] TruncateTail: truncated tail"); + } + } else { + warn!("[lossfinder] TruncateTail: no .dat file found yet"); + } + } + _ => {} + } + } + + // Per-round sanity asserts. + assert_always!( + oracle.delivered <= oracle.produced, + "lossfinder: never deliver more than produced" + ); + assert_always!( + oracle.max_delivered_id <= oracle.produced, + "lossfinder: every delivered id was previously produced" + ); + + write_status(&status_path, &oracle, scenario); + } + + // ===================== QUIESCE PHASE ============================== + // Stop producing, flush, and drain the reader. + let _ = buf.writer.flush().await; + let reject = scenario == Scenario::RejectDeliveries; + let mut empty_streak = 0u32; + let mut iters = 0u32; + while empty_streak < 5 && iters < 2000 { + iters += 1; + match time::timeout(Duration::from_millis(50), buf.reader.next()).await { + Ok(Some(mut record)) => { + empty_streak = 0; + let id = record.id; + let finalizers = record.take_finalizers(); + if reject && rand_in(0, 3) == 0 { + finalizers.update_status(EventStatus::Rejected); + oracle.rejected += 1; + } else { + finalizers.update_status(EventStatus::Delivered); + oracle.delivered += 1; + oracle.max_delivered_id = oracle.max_delivered_id.max(id); + oracle.outstanding.remove(&id); + } + drop(record); + } + Ok(None) => { + empty_streak += 1; + } + Err(_) => { + // Timed out waiting for next(): treat as a quiet tick. + empty_streak += 1; + } + } + } + + // ===================== CHECK PHASE ================================ + let leftover: BTreeSet = oracle + .outstanding + .difference(&oracle.allowed_loss) + .copied() + .collect(); + + let scenario_checks = match scenario { + // drop_newest is best-effort; we do not run a strict oracle here. + Scenario::DropNewestOverfill => false, + _ => true, + }; + + if scenario_checks { + // `assert_always!` requires a static-literal message, so we branch + // per scenario rather than passing a computed string. + let ok = leftover.is_empty(); + match scenario { + Scenario::Baseline => { + assert_always!(ok, "lossfinder Baseline: no silent data loss") + } + Scenario::WriterDropNoFlush => assert_always!( + ok, + "lossfinder WriterDropNoFlush: flushed records survive (no silent loss)" + ), + Scenario::RejectDeliveries => assert_always!( + ok, + "lossfinder RejectDeliveries: rejected records retained (no silent loss)" + ), + Scenario::CrashReopen => assert_always!( + ok, + "lossfinder CrashReopen: flushed records survive crash (no silent loss)" + ), + Scenario::Corruption => assert_always!( + ok, + "lossfinder Corruption: no collateral loss of later records (no silent loss)" + ), + Scenario::TruncateTail => assert_always!( + ok, + "lossfinder TruncateTail: no collateral loss of later records (no silent loss)" + ), + Scenario::DropNewestOverfill => unreachable!(), + } + if !ok { + oracle.silent_loss_detected += 1; + let ids: Vec = leftover.iter().take(64).copied().collect(); + warn!( + ?scenario, + leftover = leftover.len(), + ?ids, + "[lossfinder] SILENT DATA LOSS detected" + ); + + // For corruption/truncation, surface the collateral-loss subset + // explicitly (post-fault ids that vanished). + if matches!(scenario, Scenario::Corruption | Scenario::TruncateTail) { + let collateral: Vec = + leftover.intersection(&post_fault_ids).copied().collect(); + if !collateral.is_empty() { + warn!( + count = collateral.len(), + "[lossfinder] COLLATERAL loss of post-fault records" + ); + } + } + } + } + + // ----- reset for next scenario ------------------------------------ + // Clear allowed_loss; the outstanding set carries over (it should be + // empty after a clean check, and any leftover loss is permanent so we + // don't want to re-flag it forever — drop those ids now). + oracle.allowed_loss.clear(); + for id in &leftover { + oracle.outstanding.remove(id); + } + + // Periodically rotate to a fresh subdir to bound disk usage. Drain first + // so we don't strand outstanding ids on the abandoned directory. + if round.is_multiple_of(8) { + let _ = buf.writer.flush().await; + // Best-effort final drain of the old buffer. + let mut empties = 0u32; + while empties < 5 { + match time::timeout(Duration::from_millis(30), buf.reader.next()).await { + Ok(Some(mut record)) => { + empties = 0; + let id = record.id; + let f = record.take_finalizers(); + f.update_status(EventStatus::Delivered); + oracle.delivered += 1; + oracle.max_delivered_id = oracle.max_delivered_id.max(id); + oracle.outstanding.remove(&id); + } + _ => empties += 1, + } + } + subdir_seq += 1; + let fresh = base_dir.join(format!("run-{subdir_seq}")); + std::fs::create_dir_all(&fresh).expect("creating fresh subdir should not fail"); + drop(buf); + let (w, r) = build_buffer(fresh.clone(), max_size, WhenFull::Block).await; + buf = BufInstance { + writer: w, + reader: r, + }; + data_dir = fresh; + // Old directory is abandoned; clear any residual oracle state since + // those ids can never be delivered from the new (empty) buffer. + oracle.outstanding.clear(); + } + + write_status(&status_path, &oracle, scenario); + } +} + +/// Flip one byte in the middle of a `.dat` file. +fn corrupt_file(path: &Path) -> std::io::Result<()> { + let mut f = OpenOptions::new().read(true).write(true).open(path)?; + let len = f.metadata()?.len(); + if len < 4 { + return Ok(()); + } + // Pick an offset in the middle third to avoid headers/footers where possible. + let lo = len / 3; + let hi = (2 * len) / 3; + let off = if hi > lo { rand_in(lo, hi) } else { len / 2 }; + f.seek(SeekFrom::Start(off))?; + let mut byte = [0u8; 1]; + use std::io::Read as _; + f.read_exact(&mut byte)?; + byte[0] ^= 0xff; + f.seek(SeekFrom::Start(off))?; + f.write_all(&byte)?; + f.flush()?; + Ok(()) +} + +/// Truncate the tail of a `.dat` file to a smaller size. +fn truncate_file(path: &Path) -> std::io::Result<()> { + let f = OpenOptions::new().read(true).write(true).open(path)?; + let len = f.metadata()?.len(); + if len < 4 { + return Ok(()); + } + // Cut off somewhere in the back half. + let new_len = rand_in(len / 2, len.max(1)); + f.set_len(new_len)?; + Ok(()) +} + +/// Write the per-round status line consumed by the Antithesis observer commands. +fn write_status(path: &Path, oracle: &Oracle, scenario: Scenario) { + let line = format!( + "produced={} flushed={} delivered={} rejected={} dropped={} outstanding={} \ + silent_loss={} scenario={:?}\n", + oracle.produced, + oracle.produced_flushed, + oracle.delivered, + oracle.rejected, + oracle.dropped_counted, + oracle.outstanding.len(), + oracle.silent_loss_detected, + scenario, + ); + let _ = std::fs::write(path, line); +} diff --git a/tests/antithesis/AGENTS.md b/tests/antithesis/AGENTS.md new file mode 100644 index 0000000000000..904b2b9f3440d --- /dev/null +++ b/tests/antithesis/AGENTS.md @@ -0,0 +1,56 @@ +This directory contains files for running Antithesis tests against Vector's +**disk buffer v2** (`lib/vector-buffers/src/variants/disk_v2/`). + +Use the `antithesis-research` skill to analyze the system and build the property +catalog (see `scratchbook/`). Use the `antithesis-setup` skill to scaffold this +directory. Use the `antithesis-workload` skill to implement assertions and test +commands. Use the `antithesis-launch` skill to build, validate, and submit runs +— do not run `snouty launch` directly. + +## Topology (see scratchbook/deployment-topology.md) + +Two containers, single network: + +- `vdbuf-vector` (SUT): minimal-feature Vector. `http_server` source (e2e acks) + -> `http` sink with a **disk buffer** -> workload collector; + `internal_metrics` -> `prometheus_exporter` on :9598. Disk buffer `data_dir` + is on the persistent named volume `vdbuf-buffer` so it survives node-kill. +- `vdbuf-workload` (client): instrumented Rust driver (`antithesis/workload/`). + Entrypoint `serve` runs the sink collector, waits for Vector, emits + `setup_complete`, and produces uniquely-IDed events. Test template at + `/opt/antithesis/test/v1/`. + +## Instrumentation status + +- **Workload**: instrumented (LLVM sancov via `antithesis-instrumentation`) + + `antithesis_sdk` assertions; unstripped binary symlinked into `/symbols`. +- **Vector (Phase 1)**: built minimal-feature, release, **uninstrumented** (no + sancov, no `antithesis_sdk`) to keep the first build fast/low-risk. First + failure demos are workload-observable. SUT-side instrumentation is added in + the grind phase for the deadlock/underflow properties + (`total-buffer-size-never-underflows`, `writer-eventually-makes-progress`). + +## snouty + +- **launch**: `snouty launch --json --webhook basic_test --config tests/antithesis/config` + (only the `basic_test` webhook is available for now). Run `docker compose build` + first. Requires `ANTITHESIS_API_KEY` (sourced from `~/.config/snouty/secrets.env`). +- **validate**: `snouty validate tests/antithesis/config` + +## Local smoke test + +```sh +docker compose -f tests/antithesis/config/docker-compose.yaml build +docker compose -f tests/antithesis/config/docker-compose.yaml up +# expect: vector healthy, workload emits setup_complete + reachable assertions +``` + +## Subdirectories + +- `config/` — `docker-compose.yaml` + `vector.yaml` +- `workload/` — Rust workload driver crate +- `test/v1/` — Antithesis test template (test command executables) +- `scratchbook/` — research artifacts (property catalog, SUT analysis, etc.) +- `Dockerfile` — multi-stage build (`vector`, `workload` targets) +- `setup-complete.sh` — emits the `setup_complete` lifecycle event (also emitted + by the workload binary) diff --git a/tests/antithesis/Dockerfile b/tests/antithesis/Dockerfile new file mode 100644 index 0000000000000..5a3d7e3e526cb --- /dev/null +++ b/tests/antithesis/Dockerfile @@ -0,0 +1,97 @@ +# syntax=docker/dockerfile:1 +# +# Multi-stage build for the Antithesis disk-buffer-v2 harness. +# target `vector` : minimal-feature release Vector (the SUT) +# target `workload` : instrumented Rust workload driver (the client) +# +# Build context is the repo root (see antithesis/config/docker-compose.yaml). + +############################ +# Vector SUT — build stage # +############################ +FROM rust:1.92-bookworm AS vector-build +# System deps Vector's build commonly needs (protoc for prometheus/prost, cmake, +# perl/ssl for vendored crates). +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler cmake perl build-essential pkg-config libssl-dev clang \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /src +# Copy ONLY Vector build inputs (not antithesis/) so harness/config edits don't +# invalidate this (expensive) compile layer. Mirrors the .dockerignore allowlist +# minus antithesis/. +COPY Cargo.toml Cargo.lock rust-toolchain.toml build.rs ./ +COPY lib ./lib +COPY src ./src +COPY proto ./proto +COPY benches ./benches +COPY tests ./tests +COPY vdev ./vdev +COPY scripts ./scripts +# Minimal feature set: just the sources/sinks the harness needs. Disk buffer +# support is core (always compiled). Cache the registry + target dirs across +# rebuilds; copy the binary out of the cache mount within the same RUN. +# Build with Antithesis LLVM coverage instrumentation (sancov). The flags are +# applied via --config to the TARGET only, so host build-scripts/proc-macros are +# not instrumented. The antithesis-instrumentation crate (dep of vector-buffers) +# provides the libvoidstar loader + sancov callbacks; build-id is required for +# symbolization. +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + --mount=type=cache,target=/src/target \ + cargo build --release --no-default-features \ + --features "sources-http_server,sinks-http,sources-internal_metrics,sinks-prometheus,sources-demo_logs" \ + --bin vector \ + --config 'build.target = "x86_64-unknown-linux-gnu"' \ + --config 'target.x86_64-unknown-linux-gnu.rustflags = ["-Cpasses=sancov-module","-Cllvm-args=-sanitizer-coverage-level=3","-Cllvm-args=-sanitizer-coverage-trace-pc-guard","-Clink-args=-Wl,--build-id"]' \ + && cp target/x86_64-unknown-linux-gnu/release/vector /usr/local/bin/vector + +############################# +# Vector SUT — runtime stage # +############################# +FROM debian:stable-slim AS vector +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl ca-certificates \ + && rm -rf /var/lib/apt/lists/* +COPY --from=vector-build /usr/local/bin/vector /usr/bin/vector +COPY tests/antithesis/config/vector.yaml /etc/vector/vector.yaml +RUN mkdir -p /var/lib/vector +# Symbolization: expose the instrumented (unstripped, build-id'd) binary. +RUN mkdir -p /symbols && ln -s /usr/bin/vector /symbols/vector +ENV NO_COLOR=1 +EXPOSE 8080 9598 +# Vector is now built WITH sancov coverage instrumentation (antithesis- +# instrumentation crate + sancov RUSTFLAGS) and a SUT-side assert_always at the +# ledger total_buffer_size underflow site (lib/vector-buffers) — the precise +# #21683 signal. Assertions/instrumentation are no-ops outside Antithesis. +ENTRYPOINT ["/usr/bin/vector", "--config", "/etc/vector/vector.yaml"] + +############################## +# Workload — build stage # +############################## +FROM rust:1.92-bookworm AS workload-build +WORKDIR /w +COPY tests/antithesis/workload/Cargo.toml ./Cargo.toml +COPY tests/antithesis/workload/src ./src +# LLVM coverage instrumentation per the Antithesis Rust guide. build-id is +# required for symbolization. +ENV RUSTFLAGS="-Ccodegen-units=1 -Cpasses=sancov-module -Cllvm-args=-sanitizer-coverage-level=3 -Cllvm-args=-sanitizer-coverage-trace-pc-guard -Clink-args=-Wl,--build-id" +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + cargo build --release --target x86_64-unknown-linux-gnu \ + && cp target/x86_64-unknown-linux-gnu/release/vdbuf-workload /usr/local/bin/vdbuf-workload + +############################## +# Workload — runtime stage # +############################## +FROM debian:stable-slim AS workload +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl ca-certificates \ + && rm -rf /var/lib/apt/lists/* +COPY --from=workload-build /usr/local/bin/vdbuf-workload /usr/bin/vdbuf-workload +# Symbolization: expose the unstripped (DWARF) binary under /symbols. +RUN mkdir -p /symbols && ln -s /usr/bin/vdbuf-workload /symbols/vdbuf-workload +COPY tests/antithesis/test/v1 /opt/antithesis/test/v1 +COPY tests/antithesis/setup-complete.sh /usr/bin/setup-complete.sh +ENV NO_COLOR=1 +ENV VECTOR_SOURCE_URL=http://vdbuf-vector:8080/ \ + VECTOR_METRICS_URL=http://vdbuf-vector:9598/metrics \ + COLLECTOR_ADDR=0.0.0.0:8686 +ENTRYPOINT ["/usr/bin/vdbuf-workload", "serve"] diff --git a/tests/antithesis/Dockerfile.direct b/tests/antithesis/Dockerfile.direct new file mode 100644 index 0000000000000..2b9a07461c582 --- /dev/null +++ b/tests/antithesis/Dockerfile.direct @@ -0,0 +1,66 @@ +# syntax=docker/dockerfile:1 +# +# Antithesis harness for testing the disk buffer v2 DIRECTLY. +# +# Unlike tests/antithesis/Dockerfile (which runs full Vector as the SUT), this +# image's SUT is a single self-driving exerciser binary that links vector-buffers +# and drives a real disk_v2 buffer through the public topology API. Because a +# disk_v2 buffer holds an advisory lock (one process per buffer directory), the +# workload that drives the buffer must live in the same process that owns it — so +# the exerciser is both SUT and workload. The dangerous internal invariants are +# checked by surgical assert_always! calls SUT-side inside vector-buffers. +# +# Build context is the repo root (see tests/antithesis/config-direct/docker-compose.yaml). + +################################## +# Exerciser — build stage # +################################## +FROM rust:1.92-bookworm AS exerciser-build +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler cmake perl build-essential pkg-config libssl-dev clang \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /src +# Copy the same workspace inputs the main Vector build uses, so the workspace +# resolves; we only compile the vector-buffers example + its dep subtree. +COPY Cargo.toml Cargo.lock rust-toolchain.toml build.rs ./ +COPY lib ./lib +COPY src ./src +COPY proto ./proto +COPY benches ./benches +COPY tests ./tests +COPY vdev ./vdev +COPY scripts ./scripts +# Build the exerciser example with Antithesis LLVM coverage instrumentation +# (sancov) applied to the target only. The antithesis-instrumentation crate (dep +# of vector-buffers) provides the libvoidstar loader + sancov callbacks; build-id +# is required for symbolization. +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + --mount=type=cache,target=/src/target \ + cargo build --release -p vector-buffers --example disk_v2_antithesis \ + --config 'build.target = "x86_64-unknown-linux-gnu"' \ + --config 'target.x86_64-unknown-linux-gnu.rustflags = ["-Cpasses=sancov-module","-Cllvm-args=-sanitizer-coverage-level=3","-Cllvm-args=-sanitizer-coverage-trace-pc-guard","-Clink-args=-Wl,--build-id"]' \ + && cp target/x86_64-unknown-linux-gnu/release/examples/disk_v2_antithesis /usr/local/bin/vdbuf-exerciser + +################################## +# Exerciser — runtime stage # +################################## +FROM debian:stable-slim AS exerciser +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* +COPY --from=exerciser-build /usr/local/bin/vdbuf-exerciser /usr/bin/vdbuf-exerciser +RUN mkdir -p /var/lib/vdbuf +# Symbolization: expose the instrumented (unstripped, build-id'd) binary. +RUN mkdir -p /symbols && ln -s /usr/bin/vdbuf-exerciser /symbols/vdbuf-exerciser +# Observer test commands run in this same container and read the status file. +# Only the direct template — the full-Vector workload commands under +# tests/antithesis/test/v1/diskbuf/ are not applicable to this single-process SUT. +COPY tests/antithesis/test/v1/diskbuf_direct /opt/antithesis/test/v1/diskbuf_direct +ENV NO_COLOR=1 \ + RUST_LOG=warn \ + VDBUF_DIR=/var/lib/vdbuf \ + VDBUF_STATUS=/tmp/vdbuf-status +# The exerciser self-drives the buffer and emits setup_complete after the first +# end-to-end delivery. SUT-side assert_always! calls inside vector-buffers detect +# the #21683-class accounting underflows. No-ops outside Antithesis. +ENTRYPOINT ["/usr/bin/vdbuf-exerciser"] diff --git a/tests/antithesis/Dockerfile.lossfinder b/tests/antithesis/Dockerfile.lossfinder new file mode 100644 index 0000000000000..b87a205c1b408 --- /dev/null +++ b/tests/antithesis/Dockerfile.lossfinder @@ -0,0 +1,64 @@ +# syntax=docker/dockerfile:1 +# +# Antithesis harness for finding DATA LOSS in the disk buffer v2. +# +# Like Dockerfile.direct, the SUT is a single self-driving exerciser binary that +# links vector-buffers and drives a real disk_v2 buffer through the public +# topology API. This image's exerciser (disk_v2_lossfinder) runs a phased +# scenario loop whose job is to detect *silent data loss*: a record that was +# accepted and durably flushed (an at-least-once promise) but then never handed +# back to the reader and never explicitly refused. A non-empty oracle leftover +# trips an assert_always! (loss is a BUG per owner ruling). +# +# Build context is the repo root (see tests/antithesis/config-lossfinder/docker-compose.yaml). + +################################## +# Exerciser — build stage # +################################## +FROM rust:1.92-bookworm AS exerciser-build +RUN apt-get update && apt-get install -y --no-install-recommends \ + protobuf-compiler cmake perl build-essential pkg-config libssl-dev clang \ + && rm -rf /var/lib/apt/lists/* +WORKDIR /src +# Copy the same workspace inputs the main Vector build uses, so the workspace +# resolves; we only compile the vector-buffers example + its dep subtree. +COPY Cargo.toml Cargo.lock rust-toolchain.toml build.rs ./ +COPY lib ./lib +COPY src ./src +COPY proto ./proto +COPY benches ./benches +COPY tests ./tests +COPY vdev ./vdev +COPY scripts ./scripts +# Build the exerciser example with Antithesis LLVM coverage instrumentation +# (sancov) applied to the target only. The antithesis-instrumentation crate (dep +# of vector-buffers) provides the libvoidstar loader + sancov callbacks; build-id +# is required for symbolization. +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + --mount=type=cache,target=/src/target \ + cargo build --release -p vector-buffers --example disk_v2_lossfinder \ + --config 'build.target = "x86_64-unknown-linux-gnu"' \ + --config 'target.x86_64-unknown-linux-gnu.rustflags = ["-Cpasses=sancov-module","-Cllvm-args=-sanitizer-coverage-level=3","-Cllvm-args=-sanitizer-coverage-trace-pc-guard","-Clink-args=-Wl,--build-id"]' \ + && cp target/x86_64-unknown-linux-gnu/release/examples/disk_v2_lossfinder /usr/local/bin/vdbuf-lossfinder + +################################## +# Exerciser — runtime stage # +################################## +FROM debian:stable-slim AS lossfinder +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* +COPY --from=exerciser-build /usr/local/bin/vdbuf-lossfinder /usr/bin/vdbuf-lossfinder +RUN mkdir -p /var/lib/vdbuf +# Symbolization: expose the instrumented (unstripped, build-id'd) binary. +RUN mkdir -p /symbols && ln -s /usr/bin/vdbuf-lossfinder /symbols/vdbuf-lossfinder +# Observer test commands run in this same container and read the status file. +COPY tests/antithesis/test/v1/diskbuf_loss /opt/antithesis/test/v1/diskbuf_loss +ENV NO_COLOR=1 \ + RUST_LOG=warn \ + VDBUF_DIR=/var/lib/vdbuf \ + VDBUF_STATUS=/tmp/vdbuf-status +# The exerciser self-drives the buffer and emits setup_complete after the first +# end-to-end delivery. SUT-side assert_always! calls inside the exerciser detect +# silent data loss. No-ops outside Antithesis. +ENTRYPOINT ["/usr/bin/vdbuf-lossfinder"] diff --git a/tests/antithesis/config-direct/docker-compose.yaml b/tests/antithesis/config-direct/docker-compose.yaml new file mode 100644 index 0000000000000..f924ba41f1116 --- /dev/null +++ b/tests/antithesis/config-direct/docker-compose.yaml @@ -0,0 +1,39 @@ +name: vdbufdirect + +# Single-service harness that tests the disk buffer v2 DIRECTLY. The exerciser +# is both SUT and workload: it owns a real disk_v2 buffer (advisory-locked, one +# process per directory) and self-drives randomized writer/reader activity using +# the Antithesis SDK's RNG. The #21683-class accounting underflows are detected +# by assert_always! calls SUT-side inside vector-buffers. +services: + exerciser: + container_name: vdbuf-exerciser + hostname: vdbuf-exerciser + platform: linux/amd64 + init: true + build: + # Build context is the repo root (three levels up from this file). + context: ../../.. + dockerfile: tests/antithesis/Dockerfile.direct + target: exerciser + image: vdbuf-exerciser:latest + environment: + NO_COLOR: "1" + RUST_LOG: "warn" + VDBUF_DIR: "/var/lib/vdbuf" + VDBUF_STATUS: "/tmp/vdbuf-status" + volumes: + # Persistent named volume so the disk buffer survives container + # kill/restart (node-termination faults, when the webhook injects them) — + # required for the crash-durability properties to be meaningful. + - vdbuf-direct-buffer:/var/lib/vdbuf + healthcheck: + # Live once the exerciser has delivered at least one record end-to-end. + test: ["CMD-SHELL", "grep -q 'delivered=[1-9]' /tmp/vdbuf-status"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 10s + +volumes: + vdbuf-direct-buffer: diff --git a/tests/antithesis/config-drop/docker-compose.yaml b/tests/antithesis/config-drop/docker-compose.yaml new file mode 100644 index 0000000000000..3564dbb888b7c --- /dev/null +++ b/tests/antithesis/config-drop/docker-compose.yaml @@ -0,0 +1,49 @@ +name: vdbuf-drop + +services: + vector: + container_name: vdbuf-vector + hostname: vdbuf-vector + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: vector + image: vdbuf-vector:latest + environment: + NO_COLOR: "1" + VDBUF_WHEN_FULL: "drop_newest" + volumes: + # Persistent named volume so the disk buffer survives container + # kill/restart (node-termination faults). This is REQUIRED for the + # crash-durability properties to be meaningful. + - vdbuf-buffer:/var/lib/vector + healthcheck: + test: ["CMD", "curl", "-fsS", "http://localhost:9598/metrics"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 10s + + workload: + container_name: vdbuf-workload + hostname: vdbuf-workload + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: workload + image: vdbuf-workload:latest + environment: + NO_COLOR: "1" + VECTOR_SOURCE_URL: "http://vdbuf-vector:8080/" + VECTOR_METRICS_URL: "http://vdbuf-vector:9598/metrics" + COLLECTOR_ADDR: "0.0.0.0:8686" + depends_on: + vector: + condition: service_healthy + +volumes: + vdbuf-buffer: diff --git a/tests/antithesis/config-dropfail/docker-compose.yaml b/tests/antithesis/config-dropfail/docker-compose.yaml new file mode 100644 index 0000000000000..35afa8522e550 --- /dev/null +++ b/tests/antithesis/config-dropfail/docker-compose.yaml @@ -0,0 +1,52 @@ +name: vdbuf-dropfail + +services: + vector: + container_name: vdbuf-vector + hostname: vdbuf-vector + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: vector + image: vdbuf-vector:latest + environment: + NO_COLOR: "1" + VDBUF_WHEN_FULL: "drop_newest" + VDBUF_SRC_ACKS: "false" + VDBUF_SINK_CONCURRENCY: "1" + volumes: + # Persistent named volume so the disk buffer survives container + # kill/restart (node-termination faults). This is REQUIRED for the + # crash-durability properties to be meaningful. + - vdbuf-buffer:/var/lib/vector + healthcheck: + test: ["CMD", "curl", "-fsS", "http://localhost:9598/metrics"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 10s + + workload: + container_name: vdbuf-workload + hostname: vdbuf-workload + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: workload + image: vdbuf-workload:latest + environment: + NO_COLOR: "1" + VECTOR_SOURCE_URL: "http://vdbuf-vector:8080/" + VECTOR_METRICS_URL: "http://vdbuf-vector:9598/metrics" + COLLECTOR_ADDR: "0.0.0.0:8686" + COLLECTOR_DELAY_MS: "120000" + depends_on: + vector: + condition: service_healthy + +volumes: + vdbuf-buffer: diff --git a/tests/antithesis/config-dropfull/docker-compose.yaml b/tests/antithesis/config-dropfull/docker-compose.yaml new file mode 100644 index 0000000000000..7477ff1fe6f05 --- /dev/null +++ b/tests/antithesis/config-dropfull/docker-compose.yaml @@ -0,0 +1,50 @@ +name: vdbuf-dropfull + +services: + vector: + container_name: vdbuf-vector + hostname: vdbuf-vector + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: vector + image: vdbuf-vector:latest + environment: + NO_COLOR: "1" + VDBUF_WHEN_FULL: "drop_newest" + volumes: + # Persistent named volume so the disk buffer survives container + # kill/restart (node-termination faults). This is REQUIRED for the + # crash-durability properties to be meaningful. + - vdbuf-buffer:/var/lib/vector + healthcheck: + test: ["CMD", "curl", "-fsS", "http://localhost:9598/metrics"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 10s + + workload: + container_name: vdbuf-workload + hostname: vdbuf-workload + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: workload + image: vdbuf-workload:latest + environment: + NO_COLOR: "1" + VECTOR_SOURCE_URL: "http://vdbuf-vector:8080/" + VECTOR_METRICS_URL: "http://vdbuf-vector:9598/metrics" + COLLECTOR_ADDR: "0.0.0.0:8686" + COLLECTOR_DELAY_MS: "3000" + depends_on: + vector: + condition: service_healthy + +volumes: + vdbuf-buffer: diff --git a/tests/antithesis/config-lossfinder/docker-compose.yaml b/tests/antithesis/config-lossfinder/docker-compose.yaml new file mode 100644 index 0000000000000..040032f1c3b37 --- /dev/null +++ b/tests/antithesis/config-lossfinder/docker-compose.yaml @@ -0,0 +1,42 @@ +name: vdbufloss + +# Single-service harness that finds DATA LOSS in the disk buffer v2. The +# exerciser is both SUT and workload: it owns a real disk_v2 buffer +# (advisory-locked, one process per directory) and self-drives a phased scenario +# loop using the Antithesis SDK's RNG. Silent data loss (a flushed record that is +# never delivered and never explicitly refused) is detected by assert_always! +# calls SUT-side inside the exerciser. +services: + lossfinder: + container_name: vdbuf-lossfinder + hostname: vdbuf-lossfinder + platform: linux/amd64 + init: true + build: + # Build context is the repo root (three levels up from this file). + context: ../../.. + dockerfile: tests/antithesis/Dockerfile.lossfinder + target: lossfinder + image: vdbuf-lossfinder:latest + environment: + NO_COLOR: "1" + RUST_LOG: "warn" + VDBUF_DIR: "/var/lib/vdbuf" + VDBUF_STATUS: "/tmp/vdbuf-status" + VDBUF_MAX_SIZE: "268435488" + VDBUF_MAX_PAYLOAD: "4096" + volumes: + # Persistent named volume so the disk buffer survives container + # kill/restart (node-termination faults, when the webhook injects them) — + # required for the crash-durability properties to be meaningful. + - vdbuf-loss-buffer:/var/lib/vdbuf + healthcheck: + # Live once the exerciser has delivered at least one record end-to-end. + test: ["CMD-SHELL", "grep -q 'delivered=[1-9]' /tmp/vdbuf-status"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 10s + +volumes: + vdbuf-loss-buffer: diff --git a/tests/antithesis/config/docker-compose.yaml b/tests/antithesis/config/docker-compose.yaml new file mode 100644 index 0000000000000..f99b0421615f2 --- /dev/null +++ b/tests/antithesis/config/docker-compose.yaml @@ -0,0 +1,48 @@ +name: vdbuf + +services: + vector: + container_name: vdbuf-vector + hostname: vdbuf-vector + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: vector + image: vdbuf-vector:latest + environment: + NO_COLOR: "1" + volumes: + # Persistent named volume so the disk buffer survives container + # kill/restart (node-termination faults). This is REQUIRED for the + # crash-durability properties to be meaningful. + - vdbuf-buffer:/var/lib/vector + healthcheck: + test: ["CMD", "curl", "-fsS", "http://localhost:9598/metrics"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 10s + + workload: + container_name: vdbuf-workload + hostname: vdbuf-workload + platform: linux/amd64 + init: true + build: + context: ../../.. + dockerfile: tests/antithesis/Dockerfile + target: workload + image: vdbuf-workload:latest + environment: + NO_COLOR: "1" + VECTOR_SOURCE_URL: "http://vdbuf-vector:8080/" + VECTOR_METRICS_URL: "http://vdbuf-vector:9598/metrics" + COLLECTOR_ADDR: "0.0.0.0:8686" + depends_on: + vector: + condition: service_healthy + +volumes: + vdbuf-buffer: diff --git a/tests/antithesis/config/vector.yaml b/tests/antithesis/config/vector.yaml new file mode 100644 index 0000000000000..0f3959250e5d7 --- /dev/null +++ b/tests/antithesis/config/vector.yaml @@ -0,0 +1,52 @@ +# Vector config for the Antithesis disk-buffer-v2 harness. +# Path: http_server source (e2e acks) -> http sink with a DISK buffer -> +# workload collector. internal_metrics -> prometheus_exporter for +# observation + container healthcheck. +data_dir: /var/lib/vector + +sources: + in: + type: http_server + address: 0.0.0.0:8080 + decoding: + codec: json + # End-to-end acknowledgements (env-driven). Default true. A fill config sets + # VDBUF_SRC_ACKS=false so the source 200s on receipt (no per-event drain + # wait) and a fast producer can overfill the buffer to trigger drop_newest. + acknowledgements: + enabled: ${VDBUF_SRC_ACKS:-true} + + metrics: + type: internal_metrics + # Scrape often so buffer_* gauges are fresh for the workload's assertions. + scrape_interval_secs: 1 + +sinks: + out: + type: http + inputs: [in] + uri: http://vdbuf-workload:8686/ingest + method: post + encoding: + codec: json + # Env-driven sink request concurrency (default adaptive). A fill config sets + # it to 1 so that when the single in-flight request hangs on a blocked + # collector, the sink stops pulling from the disk buffer — letting the buffer + # fill to max_size and trigger drop_newest (the precondition for #24606). + request: + concurrency: ${VDBUF_SINK_CONCURRENCY:-adaptive} + # The component under test: disk buffer v2. + buffer: + type: disk + max_size: 268435488 # ~256MB, the enforced minimum + # env-driven so a config variant can select drop_newest without rebuilding + # the image (default block). + when_full: "${VDBUF_WHEN_FULL:-block}" + # Honor source acks only after the sink delivers. + acknowledgements: + enabled: true + + prom: + type: prometheus_exporter + inputs: [metrics] + address: 0.0.0.0:9598 diff --git a/tests/antithesis/setup-complete.sh b/tests/antithesis/setup-complete.sh new file mode 100755 index 0000000000000..80d71782252c4 --- /dev/null +++ b/tests/antithesis/setup-complete.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Run this script to inform Antithesis that it can start running test commands. +# You can also use the Antithesis SDK to emit setup-complete from your system if +# that is easier. +# +# Antithesis sets the `ANTITHESIS_OUTPUT_DIR` environment variable +# automatically. This script is setup to emit `setup_complete` to the +# `sdk.jsonl` file in that directory. + +OUTPUT_PATH="/tmp/antithesis_sdk.jsonl" +if [[ -n "${ANTITHESIS_OUTPUT_DIR:-}" ]]; then + OUTPUT_PATH="${ANTITHESIS_OUTPUT_DIR}/sdk.jsonl" + echo "Running in Antithesis, emitting setup_complete to ${OUTPUT_PATH}" +elif [[ -n "${ANTITHESIS_SDK_LOCAL_OUTPUT:-}" ]]; then + OUTPUT_PATH="${ANTITHESIS_SDK_LOCAL_OUTPUT}" + echo "Antithesis SDK local output override detected, emitting setup_complete to ${OUTPUT_PATH}" +fi + +mkdir -p "$(dirname "$OUTPUT_PATH")" +echo '{"antithesis_setup":{"status":"complete","details":{"message":"ready to go"}}}' >> "${OUTPUT_PATH}" diff --git a/tests/antithesis/test/v1/diskbuf/eventually_durability_and_progress.sh b/tests/antithesis/test/v1/diskbuf/eventually_durability_and_progress.sh new file mode 100755 index 0000000000000..d400f196c2811 --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf/eventually_durability_and_progress.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +# Faults paused: verify durability (every acked event reached the collector) +# and writer progress (a fresh post-recovery write is delivered -> no #21683 +# permanent deadlock). +set -euo pipefail +exec /usr/bin/vdbuf-workload check diff --git a/tests/antithesis/test/v1/diskbuf/parallel_driver_produce.sh b/tests/antithesis/test/v1/diskbuf/parallel_driver_produce.sh new file mode 100755 index 0000000000000..d3e1ed1a34f28 --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf/parallel_driver_produce.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash +# Continuously produce uniquely-IDed events into Vector (e2e acks) under fault +# injection, exercising the disk buffer's rotation / partial-write paths. +set -euo pipefail +exec /usr/bin/vdbuf-workload produce diff --git a/tests/antithesis/test/v1/diskbuf_direct/eventually_progress.sh b/tests/antithesis/test/v1/diskbuf_direct/eventually_progress.sh new file mode 100755 index 0000000000000..7fe48b3552f05 --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf_direct/eventually_progress.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# Liveness: the writer/reader keep making progress. Antithesis runs this +# repeatedly and the property holds once delivery has advanced well past the +# initial round-trip. A buffer that has deadlocked (e.g. the #21683 wrap making +# is_buffer_full() true forever) would stall delivery and fail this property. +set -euo pipefail +STATUS="${VDBUF_STATUS:-/tmp/vdbuf-status}" + +read_field() { grep -oE "$1=[0-9]+" "$STATUS" 2>/dev/null | cut -d= -f2; } + +delivered="$(read_field delivered)" +if [[ -n "${delivered:-}" && "${delivered}" -gt 100 ]]; then + echo "[eventually] sustained progress: delivered=${delivered}" + exit 0 +fi + +echo "[eventually] not enough progress yet: delivered=${delivered:-none}" >&2 +exit 1 diff --git a/tests/antithesis/test/v1/diskbuf_direct/first_wait_ready.sh b/tests/antithesis/test/v1/diskbuf_direct/first_wait_ready.sh new file mode 100755 index 0000000000000..3ffdc09bda47c --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf_direct/first_wait_ready.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# Runs once at the start of a timeline. Block until the self-driving exerciser +# has delivered at least one record end-to-end through the disk_v2 buffer, i.e. +# the SUT is live. The exerciser itself emits setup_complete; this command does +# NOT do any lifecycle signaling. +set -euo pipefail +STATUS="${VDBUF_STATUS:-/tmp/vdbuf-status}" + +for _ in $(seq 1 120); do + if grep -qE 'delivered=[1-9][0-9]*' "$STATUS" 2>/dev/null; then + echo "[first] exerciser live: $(cat "$STATUS")" + exit 0 + fi + sleep 1 +done + +echo "[first] exerciser never reported a delivery" >&2 +exit 1 diff --git a/tests/antithesis/test/v1/diskbuf_direct/parallel_driver_safety_monitor.sh b/tests/antithesis/test/v1/diskbuf_direct/parallel_driver_safety_monitor.sh new file mode 100755 index 0000000000000..8be320a290a49 --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf_direct/parallel_driver_safety_monitor.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Active workload presence + a coarse safety mirror of the SUT-side assertions. +# The exerciser self-drives the buffer; this command runs in parallel for the +# timeline, continuously checking the cheap externally-visible safety invariant: +# the buffer can never hand the reader more records than were ever produced. +# A get_total_records / accounting underflow would surface as handled > produced. +# The authoritative detector is the SUT-side assert_always! inside vector-buffers; +# this is belt-and-suspenders and gives Antithesis an active command to schedule. +set -uo pipefail +STATUS="${VDBUF_STATUS:-/tmp/vdbuf-status}" + +read_field() { grep -oE "$1=[0-9]+" "$STATUS" 2>/dev/null | cut -d= -f2; } + +# Bounded loop so the command terminates within a timeline rather than running +# truly forever. +for _ in $(seq 1 600); do + produced="$(read_field produced)" + handled="$(read_field handled)" + if [[ -n "${produced:-}" && -n "${handled:-}" ]]; then + if (( handled > produced )); then + echo "[safety] VIOLATION: handled=${handled} > produced=${produced}" >&2 + exit 1 + fi + fi + sleep 1 +done + +echo "[safety] no violation observed over the window" +exit 0 diff --git a/tests/antithesis/test/v1/diskbuf_loss/eventually_no_silent_loss.sh b/tests/antithesis/test/v1/diskbuf_loss/eventually_no_silent_loss.sh new file mode 100755 index 0000000000000..8fbe471a0ce03 --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf_loss/eventually_no_silent_loss.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# Safety property: the disk_v2 buffer never silently loses a record that was +# accepted and durably flushed. The lossfinder maintains an oracle and increments +# `silent_loss` whenever a flushed-but-unresolved record is detected after a full +# drain. This command surfaces any detected loss as an Antithesis property +# failure: exit 0 only if silent_loss=0, else exit 1. +set -uo pipefail +STATUS="${VDBUF_STATUS:-/tmp/vdbuf-status}" + +read_field() { grep -oE "$1=[0-9]+" "$STATUS" 2>/dev/null | cut -d= -f2; } + +silent_loss="$(read_field silent_loss)" +if [[ -z "${silent_loss:-}" ]]; then + echo "[eventually] status not ready yet" >&2 + exit 1 +fi + +if (( silent_loss > 0 )); then + echo "[eventually] SILENT DATA LOSS detected: silent_loss=${silent_loss} ($(cat "$STATUS"))" >&2 + exit 1 +fi + +echo "[eventually] no silent data loss observed: $(cat "$STATUS")" +exit 0 diff --git a/tests/antithesis/test/v1/diskbuf_loss/first_wait_ready.sh b/tests/antithesis/test/v1/diskbuf_loss/first_wait_ready.sh new file mode 100755 index 0000000000000..f3baf0b9f215d --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf_loss/first_wait_ready.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# Runs once at the start of a timeline. Block until the self-driving lossfinder +# has delivered at least one record end-to-end through the disk_v2 buffer, i.e. +# the SUT is live. The exerciser itself emits setup_complete; this command does +# NOT do any lifecycle signaling. +set -euo pipefail +STATUS="${VDBUF_STATUS:-/tmp/vdbuf-status}" + +for _ in $(seq 1 120); do + if grep -qE 'delivered=[1-9][0-9]*' "$STATUS" 2>/dev/null; then + echo "[first] lossfinder live: $(cat "$STATUS")" + exit 0 + fi + sleep 1 +done + +echo "[first] lossfinder never reported a delivery" >&2 +exit 1 diff --git a/tests/antithesis/test/v1/diskbuf_loss/parallel_driver_loss_monitor.sh b/tests/antithesis/test/v1/diskbuf_loss/parallel_driver_loss_monitor.sh new file mode 100755 index 0000000000000..9d8ea816fe3c1 --- /dev/null +++ b/tests/antithesis/test/v1/diskbuf_loss/parallel_driver_loss_monitor.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# Active workload presence + a continuous mirror of the SUT-side loss assertion. +# The lossfinder self-drives the buffer; this command runs in parallel for the +# timeline, continuously checking the externally-visible loss counter. The +# authoritative detector is the SUT-side assert_always! inside the exerciser; +# this is belt-and-suspenders and gives Antithesis an active command to schedule. +set -uo pipefail +STATUS="${VDBUF_STATUS:-/tmp/vdbuf-status}" + +read_field() { grep -oE "$1=[0-9]+" "$STATUS" 2>/dev/null | cut -d= -f2; } + +# Bounded loop so the command terminates within a timeline rather than running +# truly forever. +for _ in $(seq 1 600); do + silent_loss="$(read_field silent_loss)" + if [[ -n "${silent_loss:-}" ]] && (( silent_loss > 0 )); then + echo "[loss] VIOLATION: silent_loss=${silent_loss} ($(cat "$STATUS"))" >&2 + exit 1 + fi + sleep 1 +done + +echo "[loss] no silent data loss observed over the window" +exit 0 diff --git a/tests/antithesis/workload/.gitignore b/tests/antithesis/workload/.gitignore new file mode 100644 index 0000000000000..96ef6c0b944e2 --- /dev/null +++ b/tests/antithesis/workload/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/tests/antithesis/workload/Cargo.toml b/tests/antithesis/workload/Cargo.toml new file mode 100644 index 0000000000000..c47cd83caad30 --- /dev/null +++ b/tests/antithesis/workload/Cargo.toml @@ -0,0 +1,29 @@ +# Standalone package: keep it out of Vector's parent workspace (this Cargo.toml +# lives under the repo root next to Vector's workspace manifest). +[workspace] + +[package] +name = "vdbuf-workload" +version = "0.1.0" +edition = "2021" +publish = false + +[[bin]] +name = "vdbuf-workload" +path = "src/main.rs" + +[dependencies] +# Antithesis assertion + lifecycle SDK (assertions, setup_complete). +antithesis_sdk = "0.2.8" +# Antithesis LLVM coverage-instrumentation runtime shim (sancov + libvoidstar loader). +antithesis-instrumentation = "0.1" +serde_json = "1" +# Tiny synchronous HTTP server for the sink collector endpoint. +tiny_http = "0.12" +# Tiny synchronous HTTP client for producing events + scraping metrics. +# default-features = false drops TLS (we only speak plaintext HTTP intra-cluster) +# and keeps the build small/fast. +ureq = { version = "2", default-features = false } + +[profile.release] +debug = true diff --git a/tests/antithesis/workload/src/main.rs b/tests/antithesis/workload/src/main.rs new file mode 100644 index 0000000000000..21f766748d820 --- /dev/null +++ b/tests/antithesis/workload/src/main.rs @@ -0,0 +1,439 @@ +//! vdbuf-workload: Antithesis workload for Vector disk buffer v2. +//! +//! Modes (selected by argv[1]): +//! - `serve` (container entrypoint, long-lived): HTTP collector for the +//! Vector `http` sink. Records every delivered event id to a +//! shared log, waits for Vector, emits `setup_complete`, then idles +//! so Antithesis can run test commands. Emits bootstrap reachables. +//! - `produce` (parallel_driver): continuously POST uniquely-IDed events into +//! Vector's `http_server` source (e2e acks). Payload sizes are +//! drawn from a disk-buffer boundary menu (256KB write-buffer +//! threshold, large-record bypass, file-rotation filler) to drive +//! the rotation / partial-write paths where the #21683 underflow +//! triggers. Records attempted + acked ids. Fault-tolerant. +//! - `check` (eventually_): runs with faults paused. Waits for Vector to +//! recover, then asserts (a) every acked event reached the +//! collector [durability / at-least-once], and (b) a fresh +//! post-recovery write is delivered within a bound [no permanent +//! writer deadlock — the #21683 demonstration]. +//! +//! With e2e acknowledgements ON, a 200 from the source means the event was +//! delivered all the way to the collector. So "acked" ⟹ "should be in the +//! collector's delivered set", and the durability oracle needs no fsync-window +//! timestamp logic (flush_interval is not user-configurable in Vector). + +use std::collections::HashSet; +use std::fs::{self, OpenOptions}; +use std::io::Write; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use serde_json::json; + +// Keep the Antithesis coverage-instrumentation runtime shim linked in. +use antithesis_instrumentation as _; + +const SHARED_DIR: &str = "/tmp/vdbuf"; +const ATTEMPTED_LOG: &str = "/tmp/vdbuf/attempted.log"; +const ACKED_LOG: &str = "/tmp/vdbuf/acked.log"; +const DELIVERED_LOG: &str = "/tmp/vdbuf/delivered.log"; + +// Disk-buffer boundary menu for event payload sizes (bytes of the "pad" field). +// DEFAULT_WRITE_BUFFER_SIZE = 256KB (internal-buffer flush threshold + the +// large-record-bypass boundary); 1MB fills 128MB data files quickly to force +// rotation. These hit the rotation / partial-write code paths the deadlock lives in. +const W: usize = 256 * 1024; +const SIZE_MENU: [usize; 6] = [0, 1, W - 1, W, W + 1, 1024 * 1024]; + +fn env_or(key: &str, default: &str) -> String { + std::env::var(key).unwrap_or_else(|_| default.to_string()) +} + +fn now_ms() -> u128 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() +} + +fn append_line(path: &str, line: &str) { + if let Ok(mut f) = OpenOptions::new().create(true).append(true).open(path) { + let _ = writeln!(f, "{line}"); + } +} + +fn read_ids(path: &str) -> HashSet { + let mut set = HashSet::new(); + if let Ok(s) = fs::read_to_string(path) { + for line in s.lines() { + let t = line.trim(); + if !t.is_empty() { + set.insert(t.to_string()); + } + } + } + set +} + +/// Record every event id found in a decoded JSON value (array / object). +fn record_value(v: &serde_json::Value, delivered: &AtomicU64) { + match v { + serde_json::Value::Array(a) => { + for e in a { + record_value(e, delivered); + } + } + serde_json::Value::Object(_) => { + if let Some(id) = v.get("id").and_then(|x| x.as_str()) { + append_line(DELIVERED_LOG, id); + delivered.fetch_add(1, Ordering::Relaxed); + } + } + _ => {} + } +} + +/// Record delivered ids across whatever framing the http sink uses. Returns true +/// iff the body was understood as JSON (array / object / NDJSON). False for an +/// unparseable body, so the caller can refuse to ack it (no false delivery). +fn record_delivered(body: &str, delivered: &AtomicU64) -> bool { + if let Ok(v) = serde_json::from_str::(body) { + record_value(&v, delivered); + return true; + } + let mut any = false; + for line in body.lines() { + let t = line.trim(); + if t.is_empty() { + continue; + } + if let Ok(v) = serde_json::from_str::(t) { + any = true; + record_value(&v, delivered); + } + } + any +} + +/// Deterministic-ish per-run unique prefix so ids never collide across driver +/// instances / restarts within a timeline. +fn run_prefix() -> String { + let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); + format!("p{:x}{:x}", std::process::id(), nanos & 0xffffff) +} + +fn wait_for_vector(metrics_url: &str, timeout: Duration) -> bool { + let start = Instant::now(); + while start.elapsed() < timeout { + if let Ok(resp) = ureq::get(metrics_url).timeout(Duration::from_secs(2)).call() { + if resp.status() == 200 { + return true; + } + } + thread::sleep(Duration::from_millis(500)); + } + false +} + +/// POST one event with id `id` and a pad of `pad` bytes. Returns true on 2xx +/// (which, with e2e acks, means delivered end-to-end). +fn post_event(source_url: &str, id: &str, pad: usize, timeout: Duration) -> bool { + let event = json!([{ "id": id, "ts_ms": now_ms() as u64, "pad": "x".repeat(pad) }]); + let body = serde_json::to_string(&event).unwrap(); + matches!( + ureq::post(source_url) + .set("content-type", "application/json") + .timeout(timeout) + .send_string(&body), + Ok(r) if (200..300).contains(&r.status()) + ) +} + +fn main() { + antithesis_sdk::antithesis_init(); + let _ = fs::create_dir_all(SHARED_DIR); + + let mode = std::env::args().nth(1).unwrap_or_else(|| "serve".into()); + let source_url = env_or("VECTOR_SOURCE_URL", "http://vdbuf-vector:8080/"); + let metrics_url = env_or("VECTOR_METRICS_URL", "http://vdbuf-vector:9598/metrics"); + let collector_addr = env_or("COLLECTOR_ADDR", "0.0.0.0:8686"); + + match mode.as_str() { + "serve" => serve(&metrics_url, &collector_addr), + "produce" => produce(&source_url), + "check" => check(&source_url, &metrics_url), + "metrics_check" => metrics_check(&metrics_url), + "drop_check" => drop_check(&metrics_url), + "fill" => fill(&source_url), + other => { + eprintln!("unknown mode: {other} (expected serve|produce|check|metrics_check|drop_check|fill)"); + std::process::exit(2); + } + } +} + +/// Collector + lifecycle. Long-lived (container entrypoint). +fn serve(metrics_url: &str, collector_addr: &str) { + let delivered = Arc::new(AtomicU64::new(0)); + // Optional artificial collector latency. A throttled collector makes the + // http sink drain slowly so the 256MB disk buffer actually fills — the + // precondition for exercising when_full=drop_newest (#24606). + let delay_ms: u64 = env_or("COLLECTOR_DELAY_MS", "0").parse().unwrap_or(0); + { + let delivered = Arc::clone(&delivered); + let addr = collector_addr.to_string(); + thread::spawn(move || { + let server = + tiny_http::Server::http(addr.as_str()).expect("failed to bind collector"); + for mut req in server.incoming_requests() { + let mut body = String::new(); + let _ = std::io::Read::read_to_string(req.as_reader(), &mut body); + if delay_ms > 0 { + thread::sleep(Duration::from_millis(delay_ms)); + } + // Robustly record delivered ids across whatever framing the http + // sink uses (JSON array, single object, or NDJSON). Return 200 + // ONLY if we actually understood the body — otherwise the sink + // must NOT treat it as delivered (no false acks). This makes + // "acked ⟹ recorded" airtight, so any residual acked-but-not- + // delivered gap is a genuine Vector ack-without-delivery. + let understood = record_delivered(&body, &delivered); + let status = if understood { 200 } else { 500 }; + let _ = req.respond(tiny_http::Response::empty(status)); + } + }); + } + + eprintln!("[workload] serve: waiting for vector at {metrics_url}"); + if !wait_for_vector(metrics_url, Duration::from_secs(180)) { + eprintln!("[workload] WARNING: vector not ready within timeout"); + } + antithesis_sdk::lifecycle::setup_complete(&json!({ "component": "vdbuf-workload" })); + antithesis_sdk::assert_reachable!("workload serve started"); + + // Idle forever so Antithesis can run test commands; periodically note that + // delivery is happening end-to-end through the disk buffer. + let mut seen = false; + loop { + if !seen && delivered.load(Ordering::Relaxed) > 0 { + seen = true; + antithesis_sdk::assert_reachable!("event delivered end-to-end through disk buffer"); + } + thread::sleep(Duration::from_secs(2)); + } +} + +/// parallel_driver: produce uniquely-IDed events under fault injection. +fn produce(source_url: &str) { + // Per-timeline shape axis: bias payload sizes and produce duration. Drawn + // from the SDK random module so it replays deterministically and swarms + // across timelines. + let size_bias = (antithesis_sdk::random::get_random() % 3) as usize; // 0=tiny,1=mixed,2=large + let iters = 200 + (antithesis_sdk::random::get_random() % 400) as u64; // 200..600 events + + let prefix = run_prefix(); + let mut acked_any = false; + for i in 0..iters { + // menu axis: draw a payload size from the disk-buffer boundary menu, + // weighted by the per-timeline size bias. + let pad = match size_bias { + 0 => *antithesis_sdk::random::random_choice(&SIZE_MENU[0..3]).unwrap_or(&0), + 2 => *antithesis_sdk::random::random_choice(&SIZE_MENU[3..6]).unwrap_or(&W), + _ => *antithesis_sdk::random::random_choice(&SIZE_MENU).unwrap_or(&0), + }; + let id = format!("{prefix}-{i}"); + append_line(ATTEMPTED_LOG, &id); + // Generous timeout: under the #21683 deadlock the source blocks forever + // on backpressure, so we time out and keep going (fault-tolerant). + if post_event(source_url, &id, pad, Duration::from_secs(10)) { + append_line(ACKED_LOG, &id); + if !acked_any { + acked_any = true; + antithesis_sdk::assert_reachable!("produce driver got an end-to-end ack"); + } + } + // Small pacing jitter so timelines interleave with faults differently. + if antithesis_sdk::random::get_random() % 8 == 0 { + thread::sleep(Duration::from_millis(50)); + } + } +} + +/// Parse the numeric value out of a prometheus exposition line: +/// `name{labels} VALUE TIMESTAMP` -> VALUE as f64. +fn parse_metric_value(line: &str) -> Option { + line.split_whitespace().nth(1).and_then(|v| v.parse::().ok()) +} + +/// anytime_ invariant: the disk buffer's size gauges must stay within a sane +/// bound. A 256MB buffer cannot hold ~1e18 events/bytes — a gauge that large is +/// the `total_buffer_size` / `get_total_records` u64 underflow surfacing +/// (empty-buffer `0 - 1` => ~1.8e19, or a decrement underflow). Runs alongside +/// the produce driver under fault injection, so it observes gauges right after +/// node-kill restarts (where the drained-buffer underflow fires). +fn metrics_check(metrics_url: &str) { + // ~60s of continuous sampling, then exit (Antithesis reruns anytime_ cmds). + // SANE_MAX is cleanly below 2^64 (~1.8e19) yet far above any real value a + // 256MB buffer could hold. + const SANE_MAX: f64 = 1e15; + for _ in 0..30 { + if let Ok(resp) = ureq::get(metrics_url).timeout(Duration::from_secs(3)).call() { + if resp.status() == 200 { + if let Ok(body) = resp.into_string() { + for line in body.lines() { + let is_disk_gauge = line.contains("buffer_type=\"disk\"") + && (line.starts_with("vector_buffer_events{") + || line.starts_with("vector_buffer_byte_size{")); + if is_disk_gauge { + if let Some(v) = parse_metric_value(line) { + antithesis_sdk::assert_always!( + v >= 0.0 && v < SANE_MAX, + "disk buffer size gauge stays within a sane bound (no u64 underflow)", + &json!({ "value": v, "line": line }) + ); + } + } + } + } + } + } + thread::sleep(Duration::from_secs(2)); + } +} + +/// Sum the values of all prometheus lines whose metric name is `metric`. +fn sum_metric(body: &str, metric: &str) -> f64 { + body.lines() + .filter(|l| l.starts_with(metric)) + .filter_map(parse_metric_value) + .sum() +} + +/// anytime_ invariant for #24606: when `when_full=drop_newest` drops events, the +/// buffer-level `buffer_discarded_events_total` increments, but the +/// component-level `component_discarded_events_total` (what operators monitor +/// for data loss) must reflect it too. The bug: the component counter stays 0, +/// so silent data loss goes undetected. Needs `VDBUF_WHEN_FULL=drop_newest` and +/// a full buffer (network faults stall the sink -> the 256MB buffer fills). +fn drop_check(metrics_url: &str) { + let mut buf_seen = false; + for _ in 0..30 { + if let Ok(resp) = ureq::get(metrics_url).timeout(Duration::from_secs(3)).call() { + if resp.status() == 200 { + if let Ok(body) = resp.into_string() { + let buf_drop = sum_metric(&body, "vector_buffer_discarded_events_total"); + let comp_drop = sum_metric(&body, "vector_component_discarded_events_total"); + if buf_drop > 0.0 { + buf_seen = true; + } + antithesis_sdk::assert_always!( + buf_drop == 0.0 || comp_drop >= buf_drop, + "buffer drops are reflected in component_discarded_events_total (#24606)", + &json!({ "buffer_discarded": buf_drop, "component_discarded": comp_drop }) + ); + } + } + } + thread::sleep(Duration::from_secs(2)); + } + antithesis_sdk::assert_sometimes!( + buf_seen, + "drop_newest actually dropped events from the disk buffer this timeline", + &json!({}) + ); +} + +/// parallel_driver: fire-and-forget high-volume writer to rapidly fill the disk +/// buffer. Unlike `produce` (which waits up to 10s for each e2e ack), `fill` +/// uses a short timeout and ignores the response — the source still buffers each +/// event before the (abandoned) ack wait. With a blocked sink + drop_newest, the +/// 256MB buffer fills and drop_newest drops, exercising #24606. +fn fill(source_url: &str) { + antithesis_sdk::assert_reachable!("fill driver started"); + let prefix = run_prefix(); + // 64KB events: small enough to send within the short timeout, big enough to + // fill 256MB in a few thousand requests across parallel fillers. + for i in 0..50_000u64 { + let id = format!("{prefix}-f{i}"); + let _ = post_event(source_url, &id, 64 * 1024, Duration::from_millis(800)); + } + antithesis_sdk::assert_reachable!("fill driver finished a burst"); +} + +/// eventually_: faults are paused. Verify durability + writer progress. +fn check(source_url: &str, metrics_url: &str) { + // 1. Let the system recover from whatever faults happened. + eprintln!("[workload] check: waiting for vector recovery"); + let recovered = wait_for_vector(metrics_url, Duration::from_secs(120)); + + // 2. Drain rigorously: wait until Vector reports the disk buffer EMPTY + // (buffer_events == 0) AND delivered.log stops growing. Without the + // buffer-empty gate, in-flight events (accepted/acked but not yet drained + // to the collector) are falsely counted as "lost" — an oracle artifact. + let mut last = read_ids(DELIVERED_LOG).len(); + let drain_deadline = Instant::now() + Duration::from_secs(150); + loop { + thread::sleep(Duration::from_secs(3)); + let buf_events = match ureq::get(metrics_url).timeout(Duration::from_secs(3)).call() { + Ok(resp) if resp.status() == 200 => resp + .into_string() + .ok() + .map(|b| sum_metric(&b, "vector_buffer_events")) + .unwrap_or(-1.0), + _ => -1.0, + }; + let cur = read_ids(DELIVERED_LOG).len(); + let drained = buf_events == 0.0 && cur == last; + if drained || Instant::now() > drain_deadline { + break; + } + last = cur; + } + + let acked = read_ids(ACKED_LOG); + let delivered = read_ids(DELIVERED_LOG); + let missing: Vec<&String> = acked.difference(&delivered).take(20).collect(); + + // Confirm the buffer was actually exercised this timeline. + antithesis_sdk::assert_sometimes!( + !delivered.is_empty(), + "disk buffer delivered events end-to-end this timeline", + &json!({ "delivered": delivered.len(), "acked": acked.len() }) + ); + + // (a) Durability / at-least-once: with e2e acks, every event the source + // acked (200) was reported delivered downstream, so the collector must hold + // it after recovery. A miss = acknowledged-then-lost. + antithesis_sdk::assert_always!( + missing.is_empty(), + "every end-to-end-acked event survives faults and reaches the collector", + &json!({ "acked": acked.len(), "delivered": delivered.len(), + "missing_count": acked.difference(&delivered).count(), + "missing_sample": missing }) + ); + + // (b) Writer progress (the #21683 deadlock demonstration): after faults + // stop and Vector recovers, a brand-new write must be deliverable within a + // generous bound. If the ledger total_buffer_size underflowed, is_buffer_full + // stays true forever and this never succeeds. + let probe_id = format!("probe-{}", now_ms()); + let mut probe_delivered = false; + if recovered { + let deadline = Instant::now() + Duration::from_secs(45); + while Instant::now() < deadline { + if post_event(source_url, &probe_id, 1, Duration::from_secs(10)) { + // e2e ack ⟹ delivered; double-check it reached the collector. + thread::sleep(Duration::from_secs(1)); + if read_ids(DELIVERED_LOG).contains(&probe_id) { + probe_delivered = true; + break; + } + } + thread::sleep(Duration::from_secs(2)); + } + } + antithesis_sdk::assert_always!( + probe_delivered, + "post-recovery write makes progress (no permanent writer deadlock)", + &json!({ "recovered": recovered, "probe_id": probe_id }) + ); +}