Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
fluentd
Foto
FQDNs
framestream
Freescale
fuchsnj
FUJITSU
Expand Down Expand Up @@ -260,19 +261,19 @@
JAMCRC
Jameel
Jaytech
jchap-pnnl

Check warning on line 264 in .github/actions/spelling/allow.txt

View workflow job for this annotation

GitHub Actions / Check Spelling

Ignoring entry because it contains non-alpha characters (non-alpha-in-dictionary)
jemalloc
jemallocator
jetbrains
JetBrains
jhbigler-pnnl

Check warning on line 269 in .github/actions/spelling/allow.txt

View workflow job for this annotation

GitHub Actions / Check Spelling

Ignoring entry because it contains non-alpha characters (non-alpha-in-dictionary)
Jia
Jiayu
jimmystewpot
jlambatl
jndi
Joda
jorgehermo9

Check warning on line 276 in .github/actions/spelling/allow.txt

View workflow job for this annotation

GitHub Actions / Check Spelling

Ignoring entry because it contains non-alpha characters (non-alpha-in-dictionary)
journalctl
jsonnet
jsontag
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/25513_dnstap_validate_socket.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed `vector validate` deleting the Unix socket file of a running `dnstap` source (in `mode: unix`). Socket setup for framestream-based unix sources is now performed when the source starts rather than when it is built, so validating a config no longer has destructive side effects on a running instance.

authors: xfocus3
161 changes: 101 additions & 60 deletions src/sources/util/framestream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,75 +694,86 @@ pub fn build_framestream_unix_source(
) -> crate::Result<Source> {
let path = frame_handler.socket_path();

//check if the path already exists (and try to delete it)
match fs::metadata(&path) {
Ok(_) => {
//exists, so try to delete it
info!(message = "Deleting file.", ?path);
fs::remove_file(&path)?;
}
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} //doesn't exist, do nothing
Err(e) => {
error!("Unable to get socket information; error = {:?}.", e);
return Err(Box::new(e));
}
};

let listener = UnixListener::bind(&path)?;
// NOTE: Socket setup (removing any stale socket file, binding the listener,
// adjusting buffer sizes and permissions) is performed inside the returned
// future rather than eagerly. Doing it eagerly would run during component
// instantiation — including `vector validate` — and delete the socket file
// of an already-running Vector instance. See issue #25513.
let fut = async move {
//check if the path already exists (and try to delete it)
match fs::metadata(&path) {
Ok(_) => {
//exists, so try to delete it
info!(message = "Deleting file.", ?path);
if let Err(error) = fs::remove_file(&path) {
error!(message = "Unable to remove socket file.", ?path, %error);
return Err(());
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {} //doesn't exist, do nothing
Err(error) => {
error!(message = "Unable to get socket information.", ?path, %error);
return Err(());
}
};

// system's 'net.core.rmem_max' might have to be changed if socket receive buffer is not updated properly
if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() {
_ = nix::sys::socket::setsockopt(
&listener,
nix::sys::socket::sockopt::RcvBuf,
&(socket_receive_buffer_size),
);
let rcv_buf_size =
nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::RcvBuf);
info!(
"Unix socket receive buffer size modified to {}.",
rcv_buf_size.unwrap()
);
}
let listener = match UnixListener::bind(&path) {
Ok(listener) => listener,
Err(error) => {
error!(message = "Unable to bind to socket.", ?path, %error);
return Err(());
}
};

// system's 'net.core.wmem_max' might have to be changed if socket send buffer is not updated properly
if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() {
_ = nix::sys::socket::setsockopt(
&listener,
nix::sys::socket::sockopt::SndBuf,
&(socket_send_buffer_size),
);
let snd_buf_size =
nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::SndBuf);
info!(
"Unix socket buffer send size modified to {}.",
snd_buf_size.unwrap()
);
}
// system's 'net.core.rmem_max' might have to be changed if socket receive buffer is not updated properly
if let Some(socket_receive_buffer_size) = frame_handler.socket_receive_buffer_size() {
_ = nix::sys::socket::setsockopt(
&listener,
nix::sys::socket::sockopt::RcvBuf,
&(socket_receive_buffer_size),
);
let rcv_buf_size =
nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::RcvBuf);
info!(
"Unix socket receive buffer size modified to {}.",
rcv_buf_size.unwrap()
);
}

// the permissions to unix socket are restricted from 0o700 to 0o777, which are 448 and 511 in decimal
if let Some(socket_permission) = frame_handler.socket_file_mode() {
if !(448..=511).contains(&socket_permission) {
return Err(format!(
"Invalid Socket permission {socket_permission:#o}. Must between 0o700 and 0o777."
)
.into());
// system's 'net.core.wmem_max' might have to be changed if socket send buffer is not updated properly
if let Some(socket_send_buffer_size) = frame_handler.socket_send_buffer_size() {
_ = nix::sys::socket::setsockopt(
&listener,
nix::sys::socket::sockopt::SndBuf,
&(socket_send_buffer_size),
);
let snd_buf_size =
nix::sys::socket::getsockopt(&listener, nix::sys::socket::sockopt::SndBuf);
info!(
"Unix socket buffer send size modified to {}.",
snd_buf_size.unwrap()
);
}
match fs::set_permissions(&path, fs::Permissions::from_mode(socket_permission)) {
Ok(_) => {
info!("Socket permissions updated to {:#o}.", socket_permission);
}
Err(e) => {

// the permissions to unix socket are restricted from 0o700 to 0o777, which are 448 and 511 in decimal
if let Some(socket_permission) = frame_handler.socket_file_mode() {
if !(448..=511).contains(&socket_permission) {
Comment on lines +759 to +760
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve validation of invalid socket_file_mode

With this check now inside the returned source future, build_framestream_unix_source returns Ok during component construction for configs such as a dnstap unix source with socket_file_mode = 0o600; vector validate will therefore pass a config that immediately fails when the source starts. The destructive socket operations need to be deferred, but this non-destructive mode validation should still happen during build/config validation or in a dedicated config validator.

Useful? React with 👍 / 👎.

error!(
"Failed to update listener socket permissions; error = {:?}.",
e
"Invalid Socket permission {socket_permission:#o}. Must between 0o700 and 0o777."
);
return Err(Box::new(e));
return Err(());
}
match fs::set_permissions(&path, fs::Permissions::from_mode(socket_permission)) {
Ok(_) => {
info!("Socket permissions updated to {:#o}.", socket_permission);
}
Err(error) => {
error!(message = "Failed to update listener socket permissions.", %error);
return Err(());
}
}
};
};

let fut = async move {
let active_parsing_task_nums = Arc::new(AtomicUsize::new(0));

info!(message = "Listening...", ?path, r#type = "unix");
Expand Down Expand Up @@ -1803,4 +1814,34 @@ mod test {
"Max number of tasks at any given time should NOT Exceed max_frame_handling_tasks too much"
);
}

// Regression test for https://github.com/vectordotdev/vector/issues/25513:
// building the source (as `vector validate` does when instantiating
// components) must not delete an existing socket file. The socket should
// only be (re)created when the source future is actually run.
#[tokio::test(flavor = "multi_thread")]
async fn build_unix_source_does_not_remove_existing_socket() {
let frame_handler = create_frame_handler(false);
let socket_path = frame_handler.socket_path();

// Simulate the socket file of an already-running source.
std::fs::write(&socket_path, b"in use").expect("Failed to create socket file.");
assert!(socket_path.exists());

let (tx, _rx) = SourceSender::new_test();
let source_id = ComponentKey::from("test_source");
let mut shutdown = SourceShutdownCoordinator::default();
let (shutdown_signal, _) = shutdown.register_source(&source_id, false);

// Building the source should not touch the existing file...
let source = build_framestream_unix_source(frame_handler, shutdown_signal, tx)
.expect("Failed to build framestream unix source.");
assert!(
socket_path.exists(),
"Building the source must not delete the existing socket file"
);

// ...the file is only managed once the source future actually runs.
drop(source);
}
}
Loading