From 39e4b5ea3eaee7135a15164e2a47168a6f7585f7 Mon Sep 17 00:00:00 2001 From: xfocus3 <7467779+xfocus3@users.noreply.github.com> Date: Sat, 30 May 2026 16:45:16 +0100 Subject: [PATCH 1/2] fix(dnstap source): don't delete the socket file during config validation The framestream-based unix source removed any existing socket file and bound the listener eagerly while the source was being built. Component instantiation happens during `vector validate`, so validating a config deleted the socket file of an already-running dnstap source (mode: unix), killing it. Move the socket setup (stale-file removal, bind, buffer sizes, permissions) into the source future so it only runs when the source actually starts, matching the other unix sources. Validation no longer has this destructive side effect. Closes #25513 --- .../25513_dnstap_validate_socket.fix.md | 3 + src/sources/util/framestream.rs | 161 +++++++++++------- 2 files changed, 104 insertions(+), 60 deletions(-) create mode 100644 changelog.d/25513_dnstap_validate_socket.fix.md diff --git a/changelog.d/25513_dnstap_validate_socket.fix.md b/changelog.d/25513_dnstap_validate_socket.fix.md new file mode 100644 index 0000000000000..45cff92c95dd4 --- /dev/null +++ b/changelog.d/25513_dnstap_validate_socket.fix.md @@ -0,0 +1,3 @@ +Fixed `vector validate` deleting the Unix socket file of a running `dnstap` source (in `mode: unix`). Socket setup for framestream-based unix sources is now performed when the source starts rather than when it is built, so validating a config no longer has destructive side effects on a running instance. + +authors: xfocus3 diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index a42bae5815322..51954f110c719 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -694,75 +694,86 @@ pub fn build_framestream_unix_source( ) -> crate::Result { let path = frame_handler.socket_path(); - //check if the path already exists (and try to delete it) - match fs::metadata(&path) { - Ok(_) => { - //exists, so try to delete it - info!(message = "Deleting file.", ?path); - fs::remove_file(&path)?; - } - Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} //doesn't exist, do nothing - Err(e) => { - error!("Unable to get socket information; error = {:?}.", e); - return Err(Box::new(e)); - } - }; - - let listener = UnixListener::bind(&path)?; + // NOTE: Socket setup (removing any stale socket file, binding the listener, + // adjusting buffer sizes and permissions) is performed inside the returned + // future rather than eagerly. Doing it eagerly would run during component + // instantiation — including `vector validate` — and delete the socket file + // of an already-running Vector instance. See issue #25513. + let fut = async move { + //check if the path already exists (and try to delete it) + match fs::metadata(&path) { + Ok(_) => { + //exists, so try to delete it + info!(message = "Deleting file.", ?path); + if let Err(error) = fs::remove_file(&path) { + error!(message = "Unable to remove socket file.", ?path, %error); + return Err(()); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} //doesn't exist, do nothing + Err(error) => { + error!(message = "Unable to get socket information.", ?path, %error); + return Err(()); + } + }; - // system's 'net.core.rmem_max' might have to be changed if socket receive buffer is not updated properly - if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() { - _ = nix::sys::socket::setsockopt( - &listener, - nix::sys::socket::sockopt::RcvBuf, - &(socket_receive_buffer_size), - ); - let rcv_buf_size = - nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::RcvBuf); - info!( - "Unix socket receive buffer size modified to {}.", - rcv_buf_size.unwrap() - ); - } + let listener = match UnixListener::bind(&path) { + Ok(listener) => listener, + Err(error) => { + error!(message = "Unable to bind to socket.", ?path, %error); + return Err(()); + } + }; - // system's 'net.core.wmem_max' might have to be changed if socket send buffer is not updated properly - if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() { - _ = nix::sys::socket::setsockopt( - &listener, - nix::sys::socket::sockopt::SndBuf, - &(socket_send_buffer_size), - ); - let snd_buf_size = - nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::SndBuf); - info!( - "Unix socket buffer send size modified to {}.", - snd_buf_size.unwrap() - ); - } + // system's 'net.core.rmem_max' might have to be changed if socket receive buffer is not updated properly + if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() { + _ = nix::sys::socket::setsockopt( + &listener, + nix::sys::socket::sockopt::RcvBuf, + &(socket_receive_buffer_size), + ); + let rcv_buf_size = + nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::RcvBuf); + info!( + "Unix socket receive buffer size modified to {}.", + rcv_buf_size.unwrap() + ); + } - // the permissions to unix socket are restricted from 0o700 to 0o777, which are 448 and 511 in decimal - if let Some(socket_permission) = frame_handler.socket_file_mode() { - if !(448..=511).contains(&socket_permission) { - return Err(format!( - "Invalid Socket permission {socket_permission:#o}. Must between 0o700 and 0o777." - ) - .into()); + // system's 'net.core.wmem_max' might have to be changed if socket send buffer is not updated properly + if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() { + _ = nix::sys::socket::setsockopt( + &listener, + nix::sys::socket::sockopt::SndBuf, + &(socket_send_buffer_size), + ); + let snd_buf_size = + nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::SndBuf); + info!( + "Unix socket buffer send size modified to {}.", + snd_buf_size.unwrap() + ); } - match fs::set_permissions(&path, fs::Permissions::from_mode(socket_permission)) { - Ok(_) => { - info!("Socket permissions updated to {:#o}.", socket_permission); - } - Err(e) => { + + // the permissions to unix socket are restricted from 0o700 to 0o777, which are 448 and 511 in decimal + if let Some(socket_permission) = frame_handler.socket_file_mode() { + if !(448..=511).contains(&socket_permission) { error!( - "Failed to update listener socket permissions; error = {:?}.", - e + "Invalid Socket permission {socket_permission:#o}. Must between 0o700 and 0o777." ); - return Err(Box::new(e)); + return Err(()); + } + match fs::set_permissions(&path, fs::Permissions::from_mode(socket_permission)) { + Ok(_) => { + info!("Socket permissions updated to {:#o}.", socket_permission); + } + Err(error) => { + error!(message = "Failed to update listener socket permissions.", %error); + return Err(()); + } } }; - }; - let fut = async move { let active_parsing_task_nums = Arc::new(AtomicUsize::new(0)); info!(message = "Listening...", ?path, r#type = "unix"); @@ -1803,4 +1814,34 @@ mod test { "Max number of tasks at any given time should NOT Exceed max_frame_handling_tasks too much" ); } + + // Regression test for https://github.com/vectordotdev/vector/issues/25513: + // building the source (as `vector validate` does when instantiating + // components) must not delete an existing socket file. The socket should + // only be (re)created when the source future is actually run. + #[tokio::test(flavor = "multi_thread")] + async fn build_unix_source_does_not_remove_existing_socket() { + let frame_handler = create_frame_handler(false); + let socket_path = frame_handler.socket_path(); + + // Simulate the socket file of an already-running source. + std::fs::write(&socket_path, b"in use").expect("Failed to create socket file."); + assert!(socket_path.exists()); + + let (tx, _rx) = SourceSender::new_test(); + let source_id = ComponentKey::from("test_source"); + let mut shutdown = SourceShutdownCoordinator::default(); + let (shutdown_signal, _) = shutdown.register_source(&source_id, false); + + // Building the source should not touch the existing file... + let source = build_framestream_unix_source(frame_handler, shutdown_signal, tx) + .expect("Failed to build framestream unix source."); + assert!( + socket_path.exists(), + "Building the source must not delete the existing socket file" + ); + + // ...the file is only managed once the source future actually runs. + drop(source); + } } From b44e084e0a64fdbb7015b007f25c2ea1eef6d395 Mon Sep 17 00:00:00 2001 From: xfocus3 <7467779+xfocus3@users.noreply.github.com> Date: Sat, 30 May 2026 17:27:41 +0100 Subject: [PATCH 2/2] chore(deps): add 'framestream' to spelling allowlist The changelog fragment uses 'framestream' (the frame streams protocol the dnstap source is built on), which the spell-checker doesn't recognize. --- .github/actions/spelling/allow.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 7ac468f99526c..86ddb9aec78b8 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -173,6 +173,7 @@ fluentbit fluentd Foto FQDNs +framestream Freescale fuchsnj FUJITSU