From 890df17848c6d8702ea1aed38ef411c9f9b43996 Mon Sep 17 00:00:00 2001 From: Omkar Mehta Date: Thu, 28 May 2026 10:52:23 -0700 Subject: [PATCH] Add streaming ATOF exporter Signed-off-by: Omkar Mehta --- crates/core/src/observability/atof.rs | 355 +++++++++++++++++- .../tests/unit/observability/atof_tests.rs | 198 ++++++++++ docs/observability-plugin/atof.mdx | 49 +++ 3 files changed, 599 insertions(+), 3 deletions(-) diff --git a/crates/core/src/observability/atof.rs b/crates/core/src/observability/atof.rs index d1bca1d2..c574cd3f 100644 --- a/crates/core/src/observability/atof.rs +++ b/crates/core/src/observability/atof.rs @@ -10,8 +10,11 @@ use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write}; +use std::net::{Shutdown, TcpStream}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread::JoinHandle; +use std::time::Duration; use chrono::Utc; @@ -45,6 +48,28 @@ pub enum AtofExporterError { /// Underlying I/O error. source: std::io::Error, }, + /// Failed to connect to an ATOF stream receiver. + #[error("failed to connect to ATOF stream receiver {address}: {source}")] + ConnectStream { + /// Address that failed to connect. + address: String, + /// Underlying I/O error. + source: std::io::Error, + }, + /// Failed to configure the ATOF stream connection. + #[error( + "failed to configure ATOF stream receiver {address} with {operation} (ATOF_STREAM_WRITE_TIMEOUT={timeout:?}): {source}" + )] + ConfigureStream { + /// Address associated with the stream. + address: String, + /// Stream option that failed. + operation: &'static str, + /// Write timeout used when configuring the stream. + timeout: Option, + /// Underlying I/O error. + source: std::io::Error, + }, /// The exporter recorded an earlier write or serialization error. #[error("previous ATOF export failed for {path:?}: {message}")] StoredFailure { @@ -53,6 +78,14 @@ pub enum AtofExporterError { /// Stored failure message. message: String, }, + /// The streaming exporter recorded an earlier write or serialization error. + #[error("previous ATOF stream export failed for {address}: {message}")] + StoredStreamFailure { + /// Address associated with the stream. + address: String, + /// Stored failure message. + message: String, + }, /// The internal exporter state lock was poisoned. #[error("the ATOF exporter state lock was poisoned")] LockPoisoned, @@ -225,6 +258,302 @@ impl AtofExporter { } } +/// Configuration for [`AtofStreamingExporter`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AtofStreamingExporterConfig { + /// TCP address for a separate local process that receives ATOF JSONL events. + pub address: String, +} + +impl AtofStreamingExporterConfig { + /// Create a streaming exporter config for the given TCP address. + pub fn new(address: impl Into) -> Self { + Self { + address: address.into(), + } + } +} + +const ATOF_STREAM_QUEUE_BOUND: usize = 1024; +const ATOF_STREAM_WRITE_TIMEOUT: Duration = Duration::from_secs(2); + +enum AtofStreamMessage { + Event(String), + Flush(mpsc::Sender>), + Shutdown(mpsc::Sender>), +} + +struct AtofStreamingExporterState { + sender: Option>, + writer_thread: Option>, + events_sent: u64, + last_error: Arc>>, +} + +/// Snapshot of [`AtofStreamingExporter`] delivery state. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct AtofStreamingExporterStats { + /// Number of ATOF events observed by the streaming exporter. + pub events_sent: u64, + /// Most recent serialization or exporter state error, if one was recorded. + pub last_error: Option, +} + +/// TCP-backed Agent Trajectory Observability Format (ATOF) event stream exporter. +/// +/// The exporter exposes a regular NeMo Relay event subscriber and writes each +/// canonical ATOF JSON value as one JSONL line to a separate local process over +/// a TCP connection. A local UI, CLI, or bridge process can own the receiving +/// socket and fan events out over HTTP, SSE, WebSocket, stdout, or another +/// transport without redefining the ATOF event contract. +#[derive(Clone)] +pub struct AtofStreamingExporter { + address: String, + state: Arc>, +} + +impl AtofStreamingExporter { + /// Connect to a separate local ATOF stream receiver. + pub fn new(config: AtofStreamingExporterConfig) -> Result { + let address = config.address; + let stream = + TcpStream::connect(&address).map_err(|source| AtofExporterError::ConnectStream { + address: address.clone(), + source, + })?; + stream + .set_nodelay(true) + .map_err(|source| AtofExporterError::ConfigureStream { + address: address.clone(), + operation: "set_nodelay", + timeout: None, + source, + })?; + stream + .set_write_timeout(Some(ATOF_STREAM_WRITE_TIMEOUT)) + .map_err(|source| AtofExporterError::ConfigureStream { + address: address.clone(), + operation: "set_write_timeout", + timeout: Some(ATOF_STREAM_WRITE_TIMEOUT), + source, + })?; + let (sender, receiver) = mpsc::sync_channel(ATOF_STREAM_QUEUE_BOUND); + let last_error = Arc::new(Mutex::new(None)); + let writer_error = Arc::clone(&last_error); + let writer_thread = std::thread::spawn(move || { + let mut writer = BufWriter::new(stream); + while let Ok(message) = receiver.recv() { + match message { + AtofStreamMessage::Event(value) => { + if let Err(error) = write_serialized_event(&mut writer, &value) { + store_stream_error(&writer_error, error); + } + } + AtofStreamMessage::Flush(reply) => { + let result = writer.flush().map_err(|error| error.to_string()); + if let Err(error) = &result { + store_stream_error(&writer_error, error.clone()); + } + let _ = reply.send(result); + } + AtofStreamMessage::Shutdown(reply) => { + let result = writer.flush().map_err(|error| error.to_string()); + if let Err(error) = &result { + store_stream_error(&writer_error, error.clone()); + } + let _ = writer.get_ref().shutdown(Shutdown::Both); + let _ = reply.send(result); + break; + } + } + } + }); + Ok(Self { + address, + state: Arc::new(Mutex::new(AtofStreamingExporterState { + sender: Some(sender), + writer_thread: Some(writer_thread), + events_sent: 0, + last_error, + })), + }) + } + + /// Connect to a separate local ATOF stream receiver at the given TCP address. + pub fn connect(address: impl Into) -> Result { + Self::new(AtofStreamingExporterConfig::new(address)) + } + + /// Return the connected stream receiver address. + pub fn address(&self) -> &str { + &self.address + } + + /// Return an event subscriber that writes one canonical JSONL record per event. + pub fn subscriber(&self) -> EventSubscriberFn { + let state = Arc::clone(&self.state); + Arc::new(move |event: &Event| { + let value = match serialize_event(event) { + Ok(value) => value, + Err(error) => { + if let Ok(state) = state.lock() { + store_stream_error(&state.last_error, error); + } + return; + } + }; + let Ok(mut state) = state.lock() else { + return; + }; + if stream_last_error(&state.last_error).is_some() { + return; + } + let Some(sender) = state.sender.as_ref() else { + store_stream_error(&state.last_error, "stream receiver is closed".to_string()); + return; + }; + match sender.try_send(AtofStreamMessage::Event(value)) { + Ok(()) => { + state.events_sent += 1; + } + Err(mpsc::TrySendError::Full(_)) => { + store_stream_error(&state.last_error, "ATOF stream queue is full".to_string()); + } + Err(mpsc::TrySendError::Disconnected(_)) => { + store_stream_error( + &state.last_error, + "ATOF stream writer is disconnected".to_string(), + ); + } + } + }) + } + + /// Register this streaming exporter globally under the given subscriber name. + pub fn register(&self, name: &str) -> Result<()> { + register_subscriber(name, self.subscriber()).map_err(Into::into) + } + + /// Deregister a global subscriber by name. + pub fn deregister(&self, name: &str) -> Result { + deregister_subscriber(name).map_err(Into::into) + } + + /// Flush the stream and report any stored write error. + pub fn force_flush(&self) -> Result<()> { + let (sender, last_error) = { + let state = self + .state + .lock() + .map_err(|_| AtofExporterError::LockPoisoned)?; + if let Some(message) = stream_last_error(&state.last_error) { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + } + (state.sender.clone(), Arc::clone(&state.last_error)) + }; + let Some(sender) = sender else { + return Ok(()); + }; + let (reply_sender, reply_receiver) = mpsc::channel(); + if sender.send(AtofStreamMessage::Flush(reply_sender)).is_err() { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: "ATOF stream writer is disconnected".to_string(), + }); + } + match reply_receiver.recv() { + Ok(Ok(())) => { + if let Some(message) = stream_last_error(&last_error) { + return Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + } + Ok(()) + } + Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }), + Err(error) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: error.to_string(), + }), + } + } + + /// Shut down the stream by flushing and closing the TCP connection. + pub fn shutdown(&self) -> Result<()> { + let flush_result = self.force_flush(); + let (sender, writer_thread, last_error) = { + let mut state = self + .state + .lock() + .map_err(|_| AtofExporterError::LockPoisoned)?; + ( + state.sender.take(), + state.writer_thread.take(), + Arc::clone(&state.last_error), + ) + }; + let shutdown_result = if let Some(sender) = sender { + let (reply_sender, reply_receiver) = mpsc::channel(); + let send_result = sender + .send(AtofStreamMessage::Shutdown(reply_sender)) + .map_err(|_| AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: "ATOF stream writer is disconnected".to_string(), + }); + match send_result { + Ok(()) => match reply_receiver.recv() { + Ok(Ok(())) => Ok(()), + Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }), + Err(error) => Err(AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message: error.to_string(), + }), + }, + Err(error) => Err(error), + } + } else { + Ok(()) + }; + if let Some(writer_thread) = writer_thread { + let _ = writer_thread.join(); + } + let stored_result = + stream_last_error(&last_error).map(|message| AtofExporterError::StoredStreamFailure { + address: self.address.clone(), + message, + }); + match (flush_result, shutdown_result) { + (Err(error), _) => Err(error), + (Ok(()), Err(error)) => Err(error), + (Ok(()), Ok(())) => stored_result.map_or(Ok(()), Err), + } + } + + /// Return a point-in-time delivery snapshot for diagnostics and tests. + pub fn stats(&self) -> AtofStreamingExporterStats { + let Ok(state) = self.state.lock() else { + return AtofStreamingExporterStats { + last_error: Some("the ATOF streaming exporter state lock was poisoned".to_string()), + ..AtofStreamingExporterStats::default() + }; + }; + AtofStreamingExporterStats { + events_sent: state.events_sent, + last_error: stream_last_error(&state.last_error), + } + } +} + fn default_filename() -> String { format!( "nemo-relay-events-{}.jsonl", @@ -251,15 +580,35 @@ fn open_file(path: &Path, mode: AtofExporterMode) -> Result { }) } -fn write_event(writer: &mut BufWriter, event: &Event) -> std::result::Result<(), String> { +fn write_event(writer: &mut impl Write, event: &Event) -> std::result::Result<(), String> { + write_serialized_event(writer, &serialize_event(event)?) +} + +fn serialize_event(event: &Event) -> std::result::Result { let value = event .try_to_json_value() .map_err(|error| error.to_string())?; - serde_json::to_writer(&mut *writer, &value).map_err(|error| error.to_string())?; + serde_json::to_string(&value).map_err(|error| error.to_string()) +} + +fn write_serialized_event(writer: &mut impl Write, value: &str) -> std::result::Result<(), String> { + writer + .write_all(value.as_bytes()) + .map_err(|error| error.to_string())?; writer.write_all(b"\n").map_err(|error| error.to_string())?; writer.flush().map_err(|error| error.to_string()) } +fn store_stream_error(last_error: &Arc>>, error: String) { + if let Ok(mut last_error) = last_error.lock() { + last_error.get_or_insert(error); + } +} + +fn stream_last_error(last_error: &Arc>>) -> Option { + last_error.lock().ok().and_then(|error| error.clone()) +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/crates/core/tests/unit/observability/atof_tests.rs b/crates/core/tests/unit/observability/atof_tests.rs index 51fe6af2..669d4bd3 100644 --- a/crates/core/tests/unit/observability/atof_tests.rs +++ b/crates/core/tests/unit/observability/atof_tests.rs @@ -13,10 +13,17 @@ use crate::api::scope::{EmitMarkEventParams, PopScopeParams, PushScopeParams, Sc use crate::codec::request::{AnnotatedLlmRequest, Message, MessageContent}; use serde_json::{Map, json}; use std::fs; +use std::io::{BufRead, BufReader, Read}; +use std::net::TcpListener; use std::sync::Arc; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; +const TEST_RECV_TIMEOUT: Duration = Duration::from_secs(2); + fn temp_dir(prefix: &str) -> PathBuf { let id = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -111,6 +118,40 @@ fn read_jsonl(path: &Path) -> Vec { .collect() } +fn start_atof_socket_sink( + expected_events: usize, +) -> (String, mpsc::Receiver>) { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + let (stream, _) = listener.accept().unwrap(); + let mut reader = BufReader::new(stream); + let mut events = Vec::with_capacity(expected_events); + for _ in 0..expected_events { + let mut line = String::new(); + reader.read_line(&mut line).unwrap(); + events.push(serde_json::from_str(line.trim_end()).unwrap()); + } + sender.send(events).unwrap(); + }); + (address, receiver) +} + +fn start_atof_eof_sink() -> (String, mpsc::Receiver<()>) { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap().to_string(); + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + let (stream, _) = listener.accept().unwrap(); + let mut reader = BufReader::new(stream); + let mut buffer = String::new(); + reader.read_to_string(&mut buffer).unwrap(); + sender.send(()).unwrap(); + }); + (address, receiver) +} + #[test] fn default_config_uses_cwd_append_and_timestamped_filename() { let config = AtofExporterConfig::default(); @@ -218,6 +259,163 @@ fn subscriber_writes_canonical_event_jsonl() { ); } +#[test] +fn streaming_exporter_writes_canonical_event_json_values_to_socket() { + let (address, receiver) = start_atof_socket_sink(1); + let exporter = AtofStreamingExporter::connect(address).unwrap(); + let event = make_annotated_llm_event("streamed-llm-start"); + + (exporter.subscriber())(&event); + exporter.shutdown().unwrap(); + + let delivered = receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + assert_eq!(delivered[0], event.try_to_json_value().unwrap()); + assert_eq!(exporter.stats().events_sent, 1); +} + +#[test] +fn streaming_exporter_reports_connection_failure() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap().to_string(); + drop(listener); + + let error = match AtofStreamingExporter::connect(address) { + Ok(_) => panic!("expected streaming exporter connection to fail"), + Err(error) => error, + }; + + assert!(matches!(error, AtofExporterError::ConnectStream { .. })); +} + +#[test] +fn streaming_exporter_configuration_error_names_socket_options() { + let error = AtofExporterError::ConfigureStream { + address: "127.0.0.1:65535".to_string(), + operation: "set_write_timeout", + timeout: Some(ATOF_STREAM_WRITE_TIMEOUT), + source: std::io::Error::other("timeout option rejected"), + } + .to_string(); + + assert!(error.contains("set_write_timeout")); + assert!(error.contains("ATOF_STREAM_WRITE_TIMEOUT")); +} + +#[test] +fn streaming_exporter_shutdown_closes_stream_after_stored_error() { + let (address, receiver) = start_atof_eof_sink(); + let exporter = AtofStreamingExporter::connect(address).unwrap(); + + let last_error = Arc::clone(&exporter.state.lock().unwrap().last_error); + *last_error.lock().unwrap() = Some("forced failure".to_string()); + let error = exporter.shutdown().unwrap_err(); + + assert!(matches!( + error, + AtofExporterError::StoredStreamFailure { .. } + )); + receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); +} + +#[test] +fn streaming_exporter_preserves_first_stored_error() { + let last_error = Arc::new(Mutex::new(None)); + + store_stream_error(&last_error, "first failure".to_string()); + store_stream_error(&last_error, "later failure".to_string()); + + assert_eq!( + stream_last_error(&last_error), + Some("first failure".to_string()) + ); +} + +#[test] +fn streaming_exporter_registers_with_runtime_events() { + let _guard = crate::observability::test_mutex().lock().unwrap(); + reset_global(); + + let pre_handle = crate::api::scope::push_scope( + PushScopeParams::builder() + .name("pre_atof_streaming_scope") + .scope_type(ScopeType::Agent) + .input(json!({"before": true})) + .build(), + ) + .unwrap(); + crate::api::scope::event( + EmitMarkEventParams::builder() + .name("pre_atof_streaming_mark") + .parent(&pre_handle) + .data(json!({"before": true})) + .build(), + ) + .unwrap(); + crate::api::scope::pop_scope( + PopScopeParams::builder() + .handle_uuid(&pre_handle.uuid) + .output(json!({"before": true})) + .build(), + ) + .unwrap(); + + let (address, receiver) = start_atof_socket_sink(3); + let exporter = AtofStreamingExporter::connect(address).unwrap(); + let name = format!("atof_streaming_exporter_{}", Uuid::now_v7()); + + exporter.register(&name).unwrap(); + let handle = crate::api::scope::push_scope( + PushScopeParams::builder() + .name("atof_streaming_scope") + .scope_type(ScopeType::Agent) + .input(json!({"scope": true})) + .build(), + ) + .unwrap(); + crate::api::scope::event( + EmitMarkEventParams::builder() + .name("atof_streaming_mark") + .parent(&handle) + .data(json!({"mark": true})) + .build(), + ) + .unwrap(); + crate::api::scope::pop_scope( + PopScopeParams::builder() + .handle_uuid(&handle.uuid) + .output(json!({"done": true})) + .build(), + ) + .unwrap(); + + assert!(exporter.deregister(&name).unwrap()); + assert!(!exporter.deregister(&name).unwrap()); + exporter.shutdown().unwrap(); + + let events = receiver.recv_timeout(TEST_RECV_TIMEOUT).unwrap(); + let scope_start = &events[0]; + let mark = &events[1]; + let scope_end = &events[2]; + + assert_eq!(scope_start["name"], "atof_streaming_scope"); + assert_eq!(scope_start["scope_category"], "start"); + assert_eq!(mark["name"], "atof_streaming_mark"); + assert_eq!(mark["kind"], "mark"); + assert_eq!(scope_end["name"], "atof_streaming_scope"); + assert_eq!(scope_end["scope_category"], "end"); + assert!( + events + .iter() + .all(|event| event["name"] != "pre_atof_streaming_scope") + ); + assert!( + events + .iter() + .all(|event| event["name"] != "pre_atof_streaming_mark") + ); + assert_eq!(exporter.stats().events_sent, 3); +} + #[test] fn register_deregister_flush_and_shutdown_work_with_runtime_events() { let _guard = crate::observability::test_mutex().lock().unwrap(); diff --git a/docs/observability-plugin/atof.mdx b/docs/observability-plugin/atof.mdx index df705473..9cae56af 100644 --- a/docs/observability-plugin/atof.mdx +++ b/docs/observability-plugin/atof.mdx @@ -230,6 +230,55 @@ exporter.shutdown()?; +## Streaming API + +Use `AtofStreamingExporter` when an application needs to consume canonical ATOF +events as they are emitted instead of reading a completed JSONL file. The +streaming exporter connects to a separate local process over TCP and writes one +canonical ATOF JSON object per JSONL line. The receiver process can then fan the +events out through HTTP, SSE, WebSocket, stdout, or another local transport +without changing the ATOF event contract. + +```rust +use nemo_relay::observability::atof::{ + AtofStreamingExporter, AtofStreamingExporterConfig, +}; + +// Start a separate local receiver process before connecting the exporter. +let config = AtofStreamingExporterConfig::new("127.0.0.1:43199"); +let exporter = AtofStreamingExporter::new(config)?; +exporter.register("atof-stream")?; + +// Run instrumented application work here. + +let _ = exporter.deregister("atof-stream")?; +exporter.shutdown()?; +let stats = exporter.stats(); +assert!(stats.events_sent > 0); +``` + +Register the streaming exporter before the instrumented work starts. The +receiver process only observes future events and does not replay earlier +lifecycle events. + +A minimal receiver can block on TCP lines and exit when the exporter closes the +connection. This keeps short idle gaps from ending the stream: + +```rust +use serde_json::{from_str, to_string, Value}; +use std::io::{BufRead, BufReader}; +use std::net::TcpListener; + +let listener = TcpListener::bind("127.0.0.1:43199")?; +let (stream, _) = listener.accept()?; +let reader = BufReader::new(stream); + +for line in reader.lines() { + let event: Value = from_str(&line?)?; + println!("{}", to_string(&event)?); +} +``` + ## Common Validation Failures - `mode` is not `append` or `overwrite`.