diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 7ac468f99526c..86ddb9aec78b8 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -173,6 +173,7 @@ fluentbit fluentd Foto FQDNs +framestream Freescale fuchsnj FUJITSU diff --git a/changelog.d/25513_dnstap_validate_socket.fix.md b/changelog.d/25513_dnstap_validate_socket.fix.md new file mode 100644 index 0000000000000..45cff92c95dd4 --- /dev/null +++ b/changelog.d/25513_dnstap_validate_socket.fix.md @@ -0,0 +1,3 @@ +Fixed `vector validate` deleting the Unix socket file of a running `dnstap` source (in `mode: unix`). Socket setup for framestream-based unix sources is now performed when the source starts rather than when it is built, so validating a config no longer has destructive side effects on a running instance. + +authors: xfocus3 diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index a42bae5815322..51954f110c719 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -694,75 +694,86 @@ pub fn build_framestream_unix_source( ) -> crate::Result { let path = frame_handler.socket_path(); - //check if the path already exists (and try to delete it) - match fs::metadata(&path) { - Ok(_) => { - //exists, so try to delete it - info!(message = "Deleting file.", ?path); - fs::remove_file(&path)?; - } - Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} //doesn't exist, do nothing - Err(e) => { - error!("Unable to get socket information; error = {:?}.", e); - return Err(Box::new(e)); - } - }; - - let listener = UnixListener::bind(&path)?; + // NOTE: Socket setup (removing any stale socket file, binding the listener, + // adjusting buffer sizes and permissions) is performed inside the returned + // future rather than eagerly. Doing it eagerly would run during component + // instantiation — including `vector validate` — and delete the socket file + // of an already-running Vector instance. See issue #25513. + let fut = async move { + //check if the path already exists (and try to delete it) + match fs::metadata(&path) { + Ok(_) => { + //exists, so try to delete it + info!(message = "Deleting file.", ?path); + if let Err(error) = fs::remove_file(&path) { + error!(message = "Unable to remove socket file.", ?path, %error); + return Err(()); + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} //doesn't exist, do nothing + Err(error) => { + error!(message = "Unable to get socket information.", ?path, %error); + return Err(()); + } + }; - // system's 'net.core.rmem_max' might have to be changed if socket receive buffer is not updated properly - if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() { - _ = nix::sys::socket::setsockopt( - &listener, - nix::sys::socket::sockopt::RcvBuf, - &(socket_receive_buffer_size), - ); - let rcv_buf_size = - nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::RcvBuf); - info!( - "Unix socket receive buffer size modified to {}.", - rcv_buf_size.unwrap() - ); - } + let listener = match UnixListener::bind(&path) { + Ok(listener) => listener, + Err(error) => { + error!(message = "Unable to bind to socket.", ?path, %error); + return Err(()); + } + }; - // system's 'net.core.wmem_max' might have to be changed if socket send buffer is not updated properly - if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() { - _ = nix::sys::socket::setsockopt( - &listener, - nix::sys::socket::sockopt::SndBuf, - &(socket_send_buffer_size), - ); - let snd_buf_size = - nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::SndBuf); - info!( - "Unix socket buffer send size modified to {}.", - snd_buf_size.unwrap() - ); - } + // system's 'net.core.rmem_max' might have to be changed if socket receive buffer is not updated properly + if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() { + _ = nix::sys::socket::setsockopt( + &listener, + nix::sys::socket::sockopt::RcvBuf, + &(socket_receive_buffer_size), + ); + let rcv_buf_size = + nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::RcvBuf); + info!( + "Unix socket receive buffer size modified to {}.", + rcv_buf_size.unwrap() + ); + } - // the permissions to unix socket are restricted from 0o700 to 0o777, which are 448 and 511 in decimal - if let Some(socket_permission) = frame_handler.socket_file_mode() { - if !(448..=511).contains(&socket_permission) { - return Err(format!( - "Invalid Socket permission {socket_permission:#o}. Must between 0o700 and 0o777." - ) - .into()); + // system's 'net.core.wmem_max' might have to be changed if socket send buffer is not updated properly + if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() { + _ = nix::sys::socket::setsockopt( + &listener, + nix::sys::socket::sockopt::SndBuf, + &(socket_send_buffer_size), + ); + let snd_buf_size = + nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::SndBuf); + info!( + "Unix socket buffer send size modified to {}.", + snd_buf_size.unwrap() + ); } - match fs::set_permissions(&path, fs::Permissions::from_mode(socket_permission)) { - Ok(_) => { - info!("Socket permissions updated to {:#o}.", socket_permission); - } - Err(e) => { + + // the permissions to unix socket are restricted from 0o700 to 0o777, which are 448 and 511 in decimal + if let Some(socket_permission) = frame_handler.socket_file_mode() { + if !(448..=511).contains(&socket_permission) { error!( - "Failed to update listener socket permissions; error = {:?}.", - e + "Invalid Socket permission {socket_permission:#o}. Must between 0o700 and 0o777." ); - return Err(Box::new(e)); + return Err(()); + } + match fs::set_permissions(&path, fs::Permissions::from_mode(socket_permission)) { + Ok(_) => { + info!("Socket permissions updated to {:#o}.", socket_permission); + } + Err(error) => { + error!(message = "Failed to update listener socket permissions.", %error); + return Err(()); + } } }; - }; - let fut = async move { let active_parsing_task_nums = Arc::new(AtomicUsize::new(0)); info!(message = "Listening...", ?path, r#type = "unix"); @@ -1803,4 +1814,34 @@ mod test { "Max number of tasks at any given time should NOT Exceed max_frame_handling_tasks too much" ); } + + // Regression test for https://github.com/vectordotdev/vector/issues/25513: + // building the source (as `vector validate` does when instantiating + // components) must not delete an existing socket file. The socket should + // only be (re)created when the source future is actually run. + #[tokio::test(flavor = "multi_thread")] + async fn build_unix_source_does_not_remove_existing_socket() { + let frame_handler = create_frame_handler(false); + let socket_path = frame_handler.socket_path(); + + // Simulate the socket file of an already-running source. + std::fs::write(&socket_path, b"in use").expect("Failed to create socket file."); + assert!(socket_path.exists()); + + let (tx, _rx) = SourceSender::new_test(); + let source_id = ComponentKey::from("test_source"); + let mut shutdown = SourceShutdownCoordinator::default(); + let (shutdown_signal, _) = shutdown.register_source(&source_id, false); + + // Building the source should not touch the existing file... + let source = build_framestream_unix_source(frame_handler, shutdown_signal, tx) + .expect("Failed to build framestream unix source."); + assert!( + socket_path.exists(), + "Building the source must not delete the existing socket file" + ); + + // ...the file is only managed once the source future actually runs. + drop(source); + } }