From 7eeacdb8b14124da960f7f532899c31962f74464 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Fri, 29 May 2026 15:22:10 +0200 Subject: [PATCH 1/9] fix(observability): propagate component span into spawned tasks Several components spawn background `tokio` tasks without carrying the current tracing span, so internal metrics/logs emitted from that work lost their component tags (component_id, component_kind, component_type). Wrap spawned futures with `in_current_span()` (or, for the Datadog logs and metrics sinks driving `concurrent_map`, instrument the mapped future inside the closure) so the owning component's span is preserved. `ConcurrentMap` itself no longer instruments the spawned future; that is now the caller's responsibility, documented inline. The fanout detached send is intentionally left un-instrumented (it drains a sink unrelated to the upstream component that owns the fanout), with a comment explaining why. Signed-off-by: Yoenn Burban Co-authored-by: Cursor --- .../spawned_task_component_tags.fix.md | 3 +++ lib/file-source/src/file_server.rs | 4 +-- .../src/variants/disk_v2/ledger.rs | 3 ++- lib/vector-core/src/fanout.rs | 4 +++ lib/vector-stream/src/concurrent_map.rs | 5 ++++ lib/vector-tap/src/controller.rs | 4 +-- src/api/grpc/service.rs | 3 ++- src/api/grpc_server.rs | 3 ++- src/gcp.rs | 3 ++- src/secrets/exec.rs | 3 ++- src/sinks/blackhole/sink.rs | 4 ++- src/sinks/datadog/logs/sink.rs | 18 +++++++++---- src/sinks/datadog/metrics/sink.rs | 18 +++++++++---- src/sinks/datadog/traces/config.rs | 3 ++- src/sinks/mqtt/sink.rs | 3 ++- src/sinks/prometheus/exporter.rs | 2 +- src/sinks/redis/sink.rs | 3 ++- src/sinks/splunk_hec/common/service.rs | 3 ++- src/sinks/util/sink.rs | 2 +- src/sources/exec/mod.rs | 3 ++- src/sources/file.rs | 2 +- src/sources/gcp_pubsub.rs | 3 ++- src/sources/journald.rs | 5 ++-- src/sources/kafka.rs | 2 +- src/sources/kubernetes_logs/mod.rs | 7 +++--- src/sources/postgresql_metrics.rs | 6 +++-- src/sources/splunk_hec/acknowledgements.rs | 5 ++-- src/sources/util/framestream.rs | 25 +++++++++++-------- src/sources/windows_event_log/mod.rs | 6 +++-- src/transforms/throttle/rate_limiter.rs | 4 ++- 30 files changed, 106 insertions(+), 53 deletions(-) create mode 100644 changelog.d/spawned_task_component_tags.fix.md diff --git a/changelog.d/spawned_task_component_tags.fix.md b/changelog.d/spawned_task_component_tags.fix.md new file mode 100644 index 0000000000000..d2f5c2bb068a1 --- /dev/null +++ b/changelog.d/spawned_task_component_tags.fix.md @@ -0,0 +1,3 @@ +Internal telemetry (metrics and logs) emitted from work that Vector runs on spawned `tokio` tasks now correctly inherits the owning component's tags (`component_id`, `component_kind`, `component_type`). Previously, several components spawned background tasks without propagating the tracing span, so some internal events emitted from those tasks were missing their component tags. Affected emissions include the `datadog_logs` sink's `component_discarded_events_total` (events too large to encode), the `gcp_pubsub` source's `component_errors_total`/`component_discarded_events_total` from its per-stream tasks, and the `splunk_hec` sinks' acknowledgement-handling `component_errors_total`. + +authors: gwenaskell diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 87a436298ca16..cd17782ba2a02 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -24,7 +24,7 @@ use tokio::{ time::sleep, }; -use tracing::{debug, error, info, trace}; +use tracing::{Instrument, debug, error, info, trace}; use crate::{ file_watcher::{FileWatcher, RawLineResult}, @@ -153,7 +153,7 @@ where self.glob_minimum_cooldown, shutdown_checkpointer, self.emitter.clone(), - )); + ).in_current_span()); // Alright friends, how does this work? // diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index e63fc5fb16973..4531c4fcb3652 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -14,6 +14,7 @@ use crossbeam_utils::atomic::AtomicCell; use fslock::LockFile; use futures::StreamExt; use rkyv::{Archive, Serialize, with::Atomic}; +use tracing::Instrument; use snafu::{ResultExt, Snafu}; use tokio::{fs, io::AsyncWriteExt, sync::Notify}; use vector_common::finalizer::OrderedFinalizer; @@ -705,7 +706,7 @@ where self.increment_pending_acks(amount); self.notify_writer_waiters(); } - }); + }.in_current_span()); finalizer } } diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index 33ff30ae3e72a..500b6b8583c51 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -358,6 +358,10 @@ impl<'a> SendGroup<'a> { fn try_detach_send(&mut self, id: &ComponentKey) -> bool { if let Some(send) = self.sends.remove(id) { + // Deliberately not instrumented with the current span: this drains a send to a sink + // that has just been detached from the topology, so it is unrelated to the upstream + // component that owns this fanout. Attaching the current span would mis-tag this + // task's logs with the upstream component's identity rather than the detached sink's. tokio::spawn(async move { if let Err(e) = send.await { warn!( diff --git a/lib/vector-stream/src/concurrent_map.rs b/lib/vector-stream/src/concurrent_map.rs index 6b2920db1cc68..6dde7a31f7bbe 100644 --- a/lib/vector-stream/src/concurrent_map.rs +++ b/lib/vector-stream/src/concurrent_map.rs @@ -72,6 +72,11 @@ where Poll::Pending | Poll::Ready(None) => break, Poll::Ready(Some(item)) => { let fut = (this.f)(item); + // `ConcurrentMap` does not instrument the spawned future itself: the + // mapping closure runs on a detached task, so the current span at poll + // time is not necessarily meaningful for the work being performed. It is + // the caller's responsibility to propagate any span (e.g. the owning + // component's span for internal metric/log tagging) into `fut`. let handle = tokio::spawn(fut); this.in_flight.push_back(handle); } diff --git a/lib/vector-tap/src/controller.rs b/lib/vector-tap/src/controller.rs index 2a8208179f3e8..6f12cdabad2a8 100644 --- a/lib/vector-tap/src/controller.rs +++ b/lib/vector-tap/src/controller.rs @@ -224,7 +224,7 @@ fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) - } else { debug!(message = "Disconnected sink.", ?sink_id); } - }); + }.in_current_span()); shutdown_tx } @@ -370,7 +370,7 @@ async fn tap_handler( while let Some(events) = tap_buffer_rx.next().await { tap_transformer.try_send(events); } - }); + }.in_current_span()); // Attempt to connect the sink. // diff --git a/src/api/grpc/service.rs b/src/api/grpc/service.rs index 110ef9d5ba441..5af7cd3c16638 100644 --- a/src/api/grpc/service.rs +++ b/src/api/grpc/service.rs @@ -17,6 +17,7 @@ use tokio_stream::{ wrappers::{IntervalStream, ReceiverStream}, }; use tonic::{Request, Response, Status}; +use tracing::Instrument; use vector_lib::tap::{ controller::{TapController, TapPatterns, TapPayload}, topology::WatchRx, @@ -696,7 +697,7 @@ impl observability::Service for ObservabilityService { } } } - }); + }.in_current_span()); let stream = FuturesStreamExt::flat_map(ReceiverStream::new(event_rx), |events| { stream::iter(events.into_iter().map(Ok)) diff --git a/src/api/grpc_server.rs b/src/api/grpc_server.rs index 715aeecad2b8e..0fba7c560a2d1 100644 --- a/src/api/grpc_server.rs +++ b/src/api/grpc_server.rs @@ -16,6 +16,7 @@ use axum::{ }; use tokio::sync::oneshot; use tonic::transport::Server as TonicServer; +use tracing::Instrument; use tonic_health::server::{HealthReporter, health_reporter}; use vector_lib::tap::topology::WatchRx; @@ -118,7 +119,7 @@ impl GrpcServer { bind_addr = %actual_addr, ); } - }); + }.in_current_span()); info!("GRPC API server started on {}.", actual_addr); diff --git a/src/gcp.rs b/src/gcp.rs index 9a0ff365a69ee..fe44ca4455cde 100644 --- a/src/gcp.rs +++ b/src/gcp.rs @@ -17,6 +17,7 @@ use hyper::header::AUTHORIZATION; use smpl_jwt::Jwt; use snafu::{ResultExt, Snafu}; use tokio::sync::watch; +use tracing::Instrument; use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString}; use crate::{ @@ -194,7 +195,7 @@ impl GcpAuthenticator { pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> { let (sender, receiver) = watch::channel(()); - tokio::spawn(self.clone().token_regenerator(sender)); + tokio::spawn(self.clone().token_regenerator(sender).in_current_span()); receiver } diff --git a/src/secrets/exec.rs b/src/secrets/exec.rs index 081c752c2b36a..a0b5dd89fbdf8 100644 --- a/src/secrets/exec.rs +++ b/src/secrets/exec.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use bytes::BytesMut; use futures_util::StreamExt; +use tracing::Instrument; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command, time}; use tokio_util::codec; @@ -179,7 +180,7 @@ async fn query_backend( .ok_or("unable to acquire stdout")?; let query = serde_json::to_vec(&query)?; - tokio::spawn(async move { stdin.write_all(&query).await }); + tokio::spawn(async move { stdin.write_all(&query).await }.in_current_span()); let timeout = time::sleep(time::Duration::from_secs(timeout)); tokio::pin!(timeout); diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index ff0744913cc8b..c28ba0410650a 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -20,6 +20,8 @@ use vector_lib::{ }, }; +use tracing::Instrument; + use crate::{ event::{EventArray, EventContainer, EventStatus, Finalizable}, sinks::{blackhole::config::BlackholeConfig, util::StreamSink}, @@ -79,7 +81,7 @@ impl StreamSink for BlackholeSink { internal_log_rate_limit = false, "Collected events." ); - }); + }.in_current_span()); } while let Some(mut events) = input.next().await { diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 0abf4bed1ade5..a136675df7655 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -7,6 +7,7 @@ use vector_lib::{ internal_event::{ComponentEventsDropped, UNINTENTIONAL}, lookup::event_path, }; +use tracing::Instrument; use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix}; use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest}; @@ -393,12 +394,19 @@ where .concurrent_map(default_request_builder_concurrency_limit(), move |input| { let builder = Arc::clone(&builder); - Box::pin(async move { - let (api_key, events) = input; - let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key)); + // `concurrent_map` spawns this future on a detached task. The closure itself runs + // within `run_inner`'s span, so `in_current_span` captures the sink span here and + // re-enters it on the spawned task to preserve the sink's automatic component tags. + Box::pin( + async move { + let (api_key, events) = input; + let api_key = + api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key)); - builder.build_request(events, api_key) - }) + builder.build_request(events, api_key) + } + .in_current_span(), + ) }) .filter_map(|request| async move { match request { diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 1cd075a831d9c..a366cc81150c8 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -8,6 +8,7 @@ use futures_util::{ stream::{self, BoxStream}, }; use tower::Service; +use tracing::Instrument; use vector_lib::{ event::{Event, Metric, MetricValue}, partition::Partitioner, @@ -136,11 +137,18 @@ where .concurrent_map( default_request_builder_concurrency_limit(), |((api_key, endpoint), metrics)| { - Box::pin(async move { - let collapsed_metrics = - sort_and_collapse_counters_by_series_and_timestamp(metrics); - ((api_key, endpoint), collapsed_metrics) - }) + // `concurrent_map` spawns this future on a detached task. The closure itself + // runs within `run_inner`'s span, so `in_current_span` captures the sink span + // here and re-enters it on the spawned task to preserve the sink's automatic + // component tags on any internal metrics/logs emitted during aggregation. + Box::pin( + async move { + let collapsed_metrics = + sort_and_collapse_counters_by_series_and_timestamp(metrics); + ((api_key, endpoint), collapsed_metrics) + } + .in_current_span(), + ) }, ) // We build our requests "incrementally", which means that for a single batch of metrics, we might generate diff --git a/src/sinks/datadog/traces/config.rs b/src/sinks/datadog/traces/config.rs index 7ace1b75f9817..6e7235b0fa9da 100644 --- a/src/sinks/datadog/traces/config.rs +++ b/src/sinks/datadog/traces/config.rs @@ -5,6 +5,7 @@ use indoc::indoc; use snafu::ResultExt; use tokio::sync::oneshot::{Sender, channel}; use tower::ServiceBuilder; +use tracing::Instrument; use vector_lib::{ config::{AcknowledgementsConfig, proxy::ProxyConfig}, configurable::configurable_component, @@ -184,7 +185,7 @@ impl DatadogTracesConfig { compression, endpoints, Arc::clone(&apm_stats_aggregator), - )); + ).in_current_span()); Ok(VectorSink::from_event_streamsink(sink)) } diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs index cc69c8d6b9ff8..31a02d1aee690 100644 --- a/src/sinks/mqtt/sink.rs +++ b/src/sinks/mqtt/sink.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use futures::{StreamExt, stream::BoxStream}; +use tracing::Instrument; use super::{ MqttSinkConfig, @@ -76,7 +77,7 @@ impl MqttSink { } } } - }); + }.in_current_span()); let service = ServiceBuilder::new().service(MqttService { client, diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 6072bc8274ea0..17c4ba8151f42 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -477,7 +477,7 @@ impl PrometheusExporter { .map_err(|error| error!("Server error: {}.", error))?; Ok::<(), ()>(()) - }); + }.in_current_span()); self.server_shutdown_trigger = Some(trigger); Ok(()) diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index 274d2b103cf86..8716414ac59b6 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -11,6 +11,7 @@ use tokio::{ task::JoinHandle, time::sleep, }; +use tracing::Instrument; use super::{ RedisEvent, RedisRequest, RepairChannelSnafu, @@ -147,7 +148,7 @@ impl RedisConnection { task_conn_tx, ) .await - })), + }.in_current_span())), }) } diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index 095da254d832b..dfae5f8625986 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -12,6 +12,7 @@ use snafu::ResultExt; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tokio_util::sync::PollSemaphore; use tower::Service; +use tracing::Instrument; use uuid::Uuid; use vector_lib::{event::EventStatus, request_metadata::MetaDescriptive}; @@ -63,7 +64,7 @@ where ack_client, Arc::clone(&http_request_builder), indexer_acknowledgements, - )); + ).in_current_span()); Some(tx) } else { None diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index f8dd31c67f129..150d512425f64 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -323,7 +323,7 @@ where this.lingers.remove(partition); let batch = batch.finish(); - let future = tokio::spawn(this.service.call(batch)); + let future = tokio::spawn(this.service.call(batch).in_current_span()); if let Some(map) = this.in_flight.as_mut() { map.insert(partition.clone(), future.map(|_| ()).fuse().boxed()); diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index b87452ef107d1..245542ab3bc71 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -11,6 +11,7 @@ use tokio::{ time::{self, Duration, Instant, sleep}, }; use tokio_stream::wrappers::IntervalStream; +use tracing::Instrument; use vector_lib::{ EstimatedJsonEncodedSizeOf, codecs::{ @@ -749,5 +750,5 @@ fn spawn_reader_thread( } debug!("Finished capturing {} command output.", origin); - })); + }.in_current_span())); } diff --git a/src/sources/file.rs b/src/sources/file.rs index 93f4e188be97a..244e35762114d 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -587,7 +587,7 @@ pub fn file_source( } } send_shutdown.send(()) - }); + }.in_current_span()); (Some(finalizer), shutdown2.map(|_| ()).boxed()) } else { // When not dealing with end-to-end acknowledgements, just diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index b01664b7ade63..4787d1041cd07 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -34,6 +34,7 @@ use vector_lib::{ }, lookup::owned_value_path, }; +use tracing::Instrument; use vrl::{ path, value::{Kind, kind::Collection}, @@ -453,7 +454,7 @@ impl PubsubSource { // when it has an idle interval it will mark itself as not // busy. let busy_flag = Arc::new(AtomicBool::new(false)); - let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag))); + let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag)).in_current_span()); tasks.push(Task { task, busy_flag }); } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index c6177fba1eb6c..62ed2a8e47631 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -25,6 +25,7 @@ use tokio::{ time::sleep, }; use tokio_util::codec::FramedRead; +use tracing::Instrument; use vector_lib::{ EstimatedJsonEncodedSizeOf, codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError}, @@ -519,7 +520,7 @@ impl JournaldSource { let events_received = register!(EventsReceived); // Spawn stderr handler task - let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream)); + let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream).in_current_span()); let batch_size = self.batch_size; let result = loop { @@ -1062,7 +1063,7 @@ impl Finalizer { checkpointer.lock().await.set(cursor).await; } } - }); + }.in_current_span()); Self::Async(finalizer) } else { Self::Sync(checkpointer) diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index cae76d8e02059..2cc3c4fdaf639 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -464,7 +464,7 @@ async fn kafka_source( eof_tx, ) .await; - }) + }.in_current_span()) }; let client_task = { diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index fa001972a59d7..aba5247df7efd 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -35,6 +35,7 @@ use vector_lib::{ internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol}, lookup::{OwnedTargetPath, lookup_v2::OptionalTargetPath, owned_value_path, path}, }; +use tracing::Instrument; use vrl::value::{Kind, kind::Collection}; use crate::{ @@ -743,7 +744,7 @@ impl Source { pod_cacher, pod_watcher, delay_deletion, - ))); + ).in_current_span())); // ----------------------------------------------------------------- @@ -767,7 +768,7 @@ impl Source { MetaCache::new(), ns_watcher, delay_deletion, - ))); + ).in_current_span())); } // ----------------------------------------------------------------- @@ -792,7 +793,7 @@ impl Source { node_cacher, node_watcher, delay_deletion, - ))); + ).in_current_span())); let paths_provider = K8sPathsProvider::new( pod_state.clone(), diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index 96ae1ab3a1263..471eeed6817a3 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -34,6 +34,8 @@ use vector_lib::{ metric_tags, }; +use tracing::Instrument as _; + use crate::{ config::{SourceConfig, SourceContext, SourceOutput}, event::metric::{Metric, MetricKind, MetricTags, MetricValue}, @@ -295,7 +297,7 @@ impl PostgresqlClient { endpoint: &self.endpoint, } })?; - tokio::spawn(connection); + tokio::spawn(connection.in_current_span()); client } None => { @@ -306,7 +308,7 @@ impl PostgresqlClient { .with_context(|_| ConnectionFailedSnafu { endpoint: &self.endpoint, })?; - tokio::spawn(connection); + tokio::spawn(connection.in_current_span()); client } }; diff --git a/src/sources/splunk_hec/acknowledgements.rs b/src/sources/splunk_hec/acknowledgements.rs index 3f65b7d115f95..095ee7885149d 100644 --- a/src/sources/splunk_hec/acknowledgements.rs +++ b/src/sources/splunk_hec/acknowledgements.rs @@ -9,6 +9,7 @@ use std::{ }; use futures::StreamExt; +use tracing::Instrument; use roaring::RoaringTreemap; use serde::{Deserialize, Serialize}; use tokio::time::interval; @@ -117,7 +118,7 @@ impl IndexerAcknowledgement { now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time }); } - }); + }.in_current_span()); } Self { @@ -223,7 +224,7 @@ impl Channel { } } } - }); + }.in_current_span()); Self { last_used_timestamp: RwLock::new(Instant::now()), diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index a42bae5815322..d7f7866778101 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -908,17 +908,20 @@ async fn spawn_event_handling_tasks( ) -> JoinHandle<()> { wait_for_task_quota(&active_task_nums, max_frame_handling_tasks).await; - tokio::spawn(async move { - future::ready({ - if let Some(evt) = event_handler.handle_event(received_from, event_data) - && event_sink.send_event(evt).await.is_err() - { - error!("Encountered error while sending event."); - } - active_task_nums.fetch_sub(1, Ordering::AcqRel); - }) - .await; - }) + tokio::spawn( + async move { + future::ready({ + if let Some(evt) = event_handler.handle_event(received_from, event_data) + && event_sink.send_event(evt).await.is_err() + { + error!("Encountered error while sending event."); + } + active_task_nums.fetch_sub(1, Ordering::AcqRel); + }) + .await; + } + .in_current_span(), + ) } async fn wait_for_task_quota(active_task_nums: &Arc, max_tasks: usize) { diff --git a/src/sources/windows_event_log/mod.rs b/src/sources/windows_event_log/mod.rs index 9fb2cff142ee6..a6a775945da66 100644 --- a/src/sources/windows_event_log/mod.rs +++ b/src/sources/windows_event_log/mod.rs @@ -35,6 +35,8 @@ cfg_if::cfg_if! { use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE}; use windows::Win32::System::Threading::GetCurrentProcess; + use tracing::Instrument; + use crate::{ SourceSender, event::{BatchNotifier, BatchStatus, BatchStatusReceiver}, @@ -123,7 +125,7 @@ impl Finalizer { } } debug!(message = "Acknowledgement stream completed."); - }); + }.in_current_span()); Self::Async(finalizer) } else { @@ -371,7 +373,7 @@ impl WindowsEventLogSource { let _ = windows::Win32::Foundation::CloseHandle(handle); } } - }); + }.in_current_span()); // Track when we last flushed checkpoints let mut last_checkpoint = std::time::Instant::now(); diff --git a/src/transforms/throttle/rate_limiter.rs b/src/transforms/throttle/rate_limiter.rs index 83f1b8de5384b..50ab1cd35daa6 100644 --- a/src/transforms/throttle/rate_limiter.rs +++ b/src/transforms/throttle/rate_limiter.rs @@ -1,5 +1,7 @@ use std::{hash::Hash, sync::Arc, time::Duration}; +use tracing::Instrument; + use governor::{ Quota, RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore, }; @@ -31,7 +33,7 @@ where interval.tick().await; rate_limiter_clone.retain_recent(); } - }); + }.in_current_span()); Self { rate_limiter, From 1d6ef28f96a3074e14bc34d237d75e37f5e05f96 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Fri, 29 May 2026 15:37:32 +0200 Subject: [PATCH 2/9] cargo fmt --- lib/file-source/src/file_server.rs | 15 ++-- .../src/variants/disk_v2/ledger.rs | 15 ++-- lib/vector-tap/src/controller.rs | 23 +++--- src/api/grpc/service.rs | 35 ++++---- src/api/grpc_server.rs | 81 ++++++++++--------- src/secrets/exec.rs | 2 +- src/sinks/blackhole/sink.rs | 45 ++++++----- src/sinks/datadog/logs/sink.rs | 2 +- src/sinks/datadog/traces/config.rs | 17 ++-- src/sinks/mqtt/sink.rs | 31 +++---- src/sinks/prometheus/exporter.rs | 27 ++++--- src/sinks/redis/sink.rs | 21 ++--- src/sinks/splunk_hec/common/service.rs | 15 ++-- src/sources/exec/mod.rs | 45 ++++++----- src/sources/file.rs | 15 ++-- src/sources/gcp_pubsub.rs | 2 +- src/sources/journald.rs | 13 +-- src/sources/kafka.rs | 23 +++--- src/sources/kubernetes_logs/mod.rs | 32 +++----- src/sources/splunk_hec/acknowledgements.rs | 27 ++++--- src/transforms/throttle/rate_limiter.rs | 15 ++-- 21 files changed, 273 insertions(+), 228 deletions(-) diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index cd17782ba2a02..8078aa32b0575 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -148,12 +148,15 @@ where let mut stats = TimingStats::default(); // Spawn the checkpoint writer task - let checkpoint_task_handle = tokio::spawn(checkpoint_writer( - checkpointer, - self.glob_minimum_cooldown, - shutdown_checkpointer, - self.emitter.clone(), - ).in_current_span()); + let checkpoint_task_handle = tokio::spawn( + checkpoint_writer( + checkpointer, + self.glob_minimum_cooldown, + shutdown_checkpointer, + self.emitter.clone(), + ) + .in_current_span(), + ); // Alright friends, how does this work? // diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index 4531c4fcb3652..8d11fc6de6e33 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -14,9 +14,9 @@ use crossbeam_utils::atomic::AtomicCell; use fslock::LockFile; use futures::StreamExt; use rkyv::{Archive, Serialize, with::Atomic}; -use tracing::Instrument; use snafu::{ResultExt, Snafu}; use tokio::{fs, io::AsyncWriteExt, sync::Notify}; +use tracing::Instrument; use vector_common::finalizer::OrderedFinalizer; use super::{ @@ -701,12 +701,15 @@ where #[must_use] pub(super) fn spawn_finalizer(self: Arc) -> OrderedFinalizer { let (finalizer, mut stream) = OrderedFinalizer::new(None); - tokio::spawn(async move { - while let Some((_status, amount)) = stream.next().await { - self.increment_pending_acks(amount); - self.notify_writer_waiters(); + tokio::spawn( + async move { + while let Some((_status, amount)) = stream.next().await { + self.increment_pending_acks(amount); + self.notify_writer_waiters(); + } } - }.in_current_span()); + .in_current_span(), + ); finalizer } } diff --git a/lib/vector-tap/src/controller.rs b/lib/vector-tap/src/controller.rs index 6f12cdabad2a8..7aa1b556a6776 100644 --- a/lib/vector-tap/src/controller.rs +++ b/lib/vector-tap/src/controller.rs @@ -214,17 +214,20 @@ impl TapController { fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - tokio::spawn(async move { - _ = shutdown_rx.await; - if control_tx - .send(fanout::ControlMessage::Remove(sink_id.clone())) - .is_err() - { - debug!(message = "Couldn't disconnect sink.", ?sink_id); - } else { - debug!(message = "Disconnected sink.", ?sink_id); + tokio::spawn( + async move { + _ = shutdown_rx.await; + if control_tx + .send(fanout::ControlMessage::Remove(sink_id.clone())) + .is_err() + { + debug!(message = "Couldn't disconnect sink.", ?sink_id); + } else { + debug!(message = "Disconnected sink.", ?sink_id); + } } - }.in_current_span()); + .in_current_span(), + ); shutdown_tx } diff --git a/src/api/grpc/service.rs b/src/api/grpc/service.rs index 5af7cd3c16638..dff3c5c812975 100644 --- a/src/api/grpc/service.rs +++ b/src/api/grpc/service.rs @@ -677,27 +677,30 @@ impl observability::Service for ObservabilityService { let watch_rx = self.watch_rx.clone(); - tokio::spawn(async move { - let _tap_controller = TapController::new(watch_rx, tap_tx, patterns); - let mut tap_rx = ReceiverStream::new(tap_rx); - let mut interval = time::interval(time::Duration::from_millis(interval_ms)); - let mut reservoir = Reservoir::new(limit); - - loop { - select! { - Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => { - if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() { - break; + tokio::spawn( + async move { + let _tap_controller = TapController::new(watch_rx, tap_tx, patterns); + let mut tap_rx = ReceiverStream::new(tap_rx); + let mut interval = time::interval(time::Duration::from_millis(interval_ms)); + let mut reservoir = Reservoir::new(limit); + + loop { + select! { + Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => { + if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() { + break; + } } - } - _ = interval.tick() => { - if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() { - break; + _ = interval.tick() => { + if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() { + break; + } } } } } - }.in_current_span()); + .in_current_span(), + ); let stream = FuturesStreamExt::flat_map(ReceiverStream::new(event_rx), |events| { stream::iter(events.into_iter().map(Ok)) diff --git a/src/api/grpc_server.rs b/src/api/grpc_server.rs index 0fba7c560a2d1..d3cb450c019c5 100644 --- a/src/api/grpc_server.rs +++ b/src/api/grpc_server.rs @@ -16,8 +16,8 @@ use axum::{ }; use tokio::sync::oneshot; use tonic::transport::Server as TonicServer; -use tracing::Instrument; use tonic_health::server::{HealthReporter, health_reporter}; +use tracing::Instrument; use vector_lib::tap::topology::WatchRx; use super::grpc::ObservabilityService; @@ -81,45 +81,48 @@ impl GrpcServer { let router_serving = Arc::clone(&serving); // Spawn the server with the already-bound listener - tokio::spawn(async move { - // Build reflection service for tools like grpcurl - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set( - crate::proto::observability::FILE_DESCRIPTOR_SET, - ) - .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET) - .build() - .expect("Failed to build reflection service"); - - // Build the tonic router (gRPC services) and merge with the HTTP router - // so both protocols share the same port. `accept_http1(true)` lets plain - // HTTP/1.1 requests reach the merged axum routes. - let router = TonicServer::builder() - .accept_http1(true) - .add_service(health_service) - .add_service(ObservabilityServer::new(service)) - .add_service(reflection_service) - .into_router() - .merge(http_router(router_serving)); - - let result = hyper::Server::from_tcp(std_listener) - .expect("Failed to build HTTP server from TCP listener") - .serve(router.into_make_service()) - .with_graceful_shutdown(async { - rx.await.ok(); - info!("GRPC API server shutting down."); - }) - .await; - - if let Err(e) = result { - error!( - message = "GRPC server encountered an error.", - error = %e, - error_source = ?e.source(), - bind_addr = %actual_addr, - ); + tokio::spawn( + async move { + // Build reflection service for tools like grpcurl + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set( + crate::proto::observability::FILE_DESCRIPTOR_SET, + ) + .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET) + .build() + .expect("Failed to build reflection service"); + + // Build the tonic router (gRPC services) and merge with the HTTP router + // so both protocols share the same port. `accept_http1(true)` lets plain + // HTTP/1.1 requests reach the merged axum routes. + let router = TonicServer::builder() + .accept_http1(true) + .add_service(health_service) + .add_service(ObservabilityServer::new(service)) + .add_service(reflection_service) + .into_router() + .merge(http_router(router_serving)); + + let result = hyper::Server::from_tcp(std_listener) + .expect("Failed to build HTTP server from TCP listener") + .serve(router.into_make_service()) + .with_graceful_shutdown(async { + rx.await.ok(); + info!("GRPC API server shutting down."); + }) + .await; + + if let Err(e) = result { + error!( + message = "GRPC server encountered an error.", + error = %e, + error_source = ?e.source(), + bind_addr = %actual_addr, + ); + } } - }.in_current_span()); + .in_current_span(), + ); info!("GRPC API server started on {}.", actual_addr); diff --git a/src/secrets/exec.rs b/src/secrets/exec.rs index a0b5dd89fbdf8..7ad0546076ec0 100644 --- a/src/secrets/exec.rs +++ b/src/secrets/exec.rs @@ -2,10 +2,10 @@ use std::collections::{HashMap, HashSet}; use bytes::BytesMut; use futures_util::StreamExt; -use tracing::Instrument; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command, time}; use tokio_util::codec; +use tracing::Instrument; use vector_lib::configurable::{component::GenerateConfig, configurable_component}; use vrl::value::Value; diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index c28ba0410650a..ca5d941b114cf 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -59,29 +59,32 @@ impl StreamSink for BlackholeSink { if self.config.print_interval_secs.as_secs() > 0 { let interval_dur = self.config.print_interval_secs; - tokio::spawn(async move { - let mut print_interval = interval(interval_dur); - loop { - select! { - _ = print_interval.tick() => { - info!( - events = total_events.load(Ordering::Relaxed), - raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), - internal_log_rate_limit = false, - "Collected events." - ); - }, - _ = tripwire.changed() => break, + tokio::spawn( + async move { + let mut print_interval = interval(interval_dur); + loop { + select! { + _ = print_interval.tick() => { + info!( + events = total_events.load(Ordering::Relaxed), + raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), + internal_log_rate_limit = false, + "Collected events." + ); + }, + _ = tripwire.changed() => break, + } } - } - info!( - events = total_events.load(Ordering::Relaxed), - raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), - internal_log_rate_limit = false, - "Collected events." - ); - }.in_current_span()); + info!( + events = total_events.load(Ordering::Relaxed), + raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), + internal_log_rate_limit = false, + "Collected events." + ); + } + .in_current_span(), + ); } while let Some(mut events) = input.next().await { diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index a136675df7655..5fb5e2189fa10 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -2,12 +2,12 @@ use std::{collections::VecDeque, fmt::Debug, io, sync::Arc}; use itertools::Itertools; use snafu::Snafu; +use tracing::Instrument; use vector_lib::{ event::{ObjectMap, Value}, internal_event::{ComponentEventsDropped, UNINTENTIONAL}, lookup::event_path, }; -use tracing::Instrument; use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix}; use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest}; diff --git a/src/sinks/datadog/traces/config.rs b/src/sinks/datadog/traces/config.rs index 6e7235b0fa9da..c5ce5ffb9d134 100644 --- a/src/sinks/datadog/traces/config.rs +++ b/src/sinks/datadog/traces/config.rs @@ -179,13 +179,16 @@ impl DatadogTracesConfig { // Send the APM stats payloads independently of the sink framework. // This is necessary to comply with what the APM stats backend of Datadog expects with // respect to receiving stats payloads. - tokio::spawn(flush_apm_stats_thread( - tripwire, - client, - compression, - endpoints, - Arc::clone(&apm_stats_aggregator), - ).in_current_span()); + tokio::spawn( + flush_apm_stats_thread( + tripwire, + client, + compression, + endpoints, + Arc::clone(&apm_stats_aggregator), + ) + .in_current_span(), + ); Ok(VectorSink::from_event_streamsink(sink)) } diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs index 31a02d1aee690..98a5dab2d1cba 100644 --- a/src/sinks/mqtt/sink.rs +++ b/src/sinks/mqtt/sink.rs @@ -61,23 +61,26 @@ impl MqttSink { let (client, mut connection) = self.connector.connect(); // This is necessary to keep the mqtt event loop moving forward. - tokio::spawn(async move { - loop { - // If an error is returned here there is currently no way to tie this back - // to the event that was posted which means we can't accurately provide - // delivery guarantees. - // We need this issue resolved first: - // https://github.com/bytebeamio/rumqtt/issues/349 - match connection.poll().await { - Ok(_) => {} - Err(connection_error) => { - emit!(MqttConnectionError { - error: connection_error - }); + tokio::spawn( + async move { + loop { + // If an error is returned here there is currently no way to tie this back + // to the event that was posted which means we can't accurately provide + // delivery guarantees. + // We need this issue resolved first: + // https://github.com/bytebeamio/rumqtt/issues/349 + match connection.poll().await { + Ok(_) => {} + Err(connection_error) => { + emit!(MqttConnectionError { + error: connection_error + }); + } } } } - }.in_current_span()); + .in_current_span(), + ); let service = ServiceBuilder::new().service(MqttService { client, diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 17c4ba8151f42..2c0063bd7e1f7 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -466,18 +466,21 @@ impl PrometheusExporter { let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?; let listener = tls.bind(&address).await?; - tokio::spawn(async move { - info!(message = "Building HTTP server.", address = %address); - - Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) - .serve(new_service) - .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler)) - .instrument(span) - .await - .map_err(|error| error!("Server error: {}.", error))?; - - Ok::<(), ()>(()) - }.in_current_span()); + tokio::spawn( + async move { + info!(message = "Building HTTP server.", address = %address); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(new_service) + .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler)) + .instrument(span) + .await + .map_err(|error| error!("Server error: {}.", error))?; + + Ok::<(), ()>(()) + } + .in_current_span(), + ); self.server_shutdown_trigger = Some(trigger); Ok(()) diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index 8716414ac59b6..a9f8667b8319a 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -140,15 +140,18 @@ impl RedisConnection { Ok(Self::Sentinel { connection_send: conn_tx, connection_recv: conn_rx, - repair_task: Arc::new(tokio::spawn(async move { - Self::repair_connection_manager_task( - sentinel, - service_name, - node_connection_info, - task_conn_tx, - ) - .await - }.in_current_span())), + repair_task: Arc::new(tokio::spawn( + async move { + Self::repair_connection_manager_task( + sentinel, + service_name, + node_connection_info, + task_conn_tx, + ) + .await + } + .in_current_span(), + )), }) } diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index dfae5f8625986..f4f6d3ff7d760 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -59,12 +59,15 @@ where let max_pending_acks = indexer_acknowledgements.max_pending_acks.get(); let tx = if let Some(ack_client) = ack_client { let (tx, rx) = mpsc::channel(128); - tokio::spawn(run_acknowledgements( - rx, - ack_client, - Arc::clone(&http_request_builder), - indexer_acknowledgements, - ).in_current_span()); + tokio::spawn( + run_acknowledgements( + rx, + ack_client, + Arc::clone(&http_request_builder), + indexer_acknowledgements, + ) + .in_current_span(), + ); Some(tx) } else { None diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 245542ab3bc71..163546f9ccc60 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -725,30 +725,33 @@ fn spawn_reader_thread( sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>, ) { // Start the green background thread for collecting - drop(tokio::spawn(async move { - debug!("Start capturing {} command output.", origin); - - let mut stream = DecoderFramedRead::new(reader, decoder); - while let Some(result) = stream.next().await { - match result { - Ok(next) => { - if sender.send((next, origin)).await.is_err() { - // If the receive half of the channel is closed, either due to close being - // called or the Receiver handle dropping, the function returns an error. - emit!(ExecChannelClosedError); - break; + drop(tokio::spawn( + async move { + debug!("Start capturing {} command output.", origin); + + let mut stream = DecoderFramedRead::new(reader, decoder); + while let Some(result) = stream.next().await { + match result { + Ok(next) => { + if sender.send((next, origin)).await.is_err() { + // If the receive half of the channel is closed, either due to close being + // called or the Receiver handle dropping, the function returns an error. + emit!(ExecChannelClosedError); + break; + } } - } - Err(error) => { - // Error is logged by `vector_lib::codecs::Decoder`, no further - // handling is needed here. - if !error.can_continue() { - break; + Err(error) => { + // Error is logged by `vector_lib::codecs::Decoder`, no further + // handling is needed here. + if !error.can_continue() { + break; + } } } } - } - debug!("Finished capturing {} command output.", origin); - }.in_current_span())); + debug!("Finished capturing {} command output.", origin); + } + .in_current_span(), + )); } diff --git a/src/sources/file.rs b/src/sources/file.rs index 244e35762114d..82b19261c8bd4 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -580,14 +580,17 @@ pub fn file_source( // checkpoints until all the acks have come in. let (send_shutdown, shutdown2) = oneshot::channel::<()>(); let checkpoints = checkpointer.view(); - tokio::spawn(async move { - while let Some((status, entry)) = ack_stream.next().await { - if status == BatchStatus::Delivered { - checkpoints.update(entry.file_id, entry.offset); + tokio::spawn( + async move { + while let Some((status, entry)) = ack_stream.next().await { + if status == BatchStatus::Delivered { + checkpoints.update(entry.file_id, entry.offset); + } } + send_shutdown.send(()) } - send_shutdown.send(()) - }.in_current_span()); + .in_current_span(), + ); (Some(finalizer), shutdown2.map(|_| ()).boxed()) } else { // When not dealing with end-to-end acknowledgements, just diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 4787d1041cd07..87127b2e20f7f 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -23,6 +23,7 @@ use tonic::{ metadata::MetadataValue, transport::{Certificate, ClientTlsConfig, Endpoint, Identity}, }; +use tracing::Instrument; use vector_lib::{ byte_size_of::ByteSizeOf, codecs::decoding::{DeserializerConfig, FramingConfig}, @@ -34,7 +35,6 @@ use vector_lib::{ }, lookup::owned_value_path, }; -use tracing::Instrument; use vrl::{ path, value::{Kind, kind::Collection}, diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 62ed2a8e47631..5038b0a75f44a 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -1057,13 +1057,16 @@ impl Finalizer { ) -> Self { if acknowledgements { let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown)); - tokio::spawn(async move { - while let Some((status, cursor)) = ack_stream.next().await { - if status == BatchStatus::Delivered { - checkpointer.lock().await.set(cursor).await; + tokio::spawn( + async move { + while let Some((status, cursor)) = ack_stream.next().await { + if status == BatchStatus::Delivered { + checkpointer.lock().await.set(cursor).await; + } } } - }.in_current_span()); + .in_current_span(), + ); Self::Async(finalizer) } else { Self::Sync(checkpointer) diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 2cc3c4fdaf639..cf379cafbd183 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -455,16 +455,19 @@ async fn kafka_source( .map_or(config.session_timeout_ms / 2, Duration::from_millis); let consumer_state = ConsumerStateInner::::new(config, decoder, out, log_namespace, span); - tokio::spawn(async move { - coordinate_kafka_callbacks( - consumer, - callback_rx, - consumer_state, - drain_timeout_ms, - eof_tx, - ) - .await; - }.in_current_span()) + tokio::spawn( + async move { + coordinate_kafka_callbacks( + consumer, + callback_rx, + consumer_state, + drain_timeout_ms, + eof_tx, + ) + .await; + } + .in_current_span(), + ) }; let client_task = { diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index aba5247df7efd..3482a93b86870 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -21,6 +21,7 @@ use kube::{ }; use lifecycle::Lifecycle; use serde_with::serde_as; +use tracing::Instrument; use vector_lib::{ EstimatedJsonEncodedSizeOf, TimeZone, codecs::{BytesDeserializer, BytesDeserializerConfig}, @@ -35,7 +36,6 @@ use vector_lib::{ internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol}, lookup::{OwnedTargetPath, lookup_v2::OptionalTargetPath, owned_value_path, path}, }; -use tracing::Instrument; use vrl::value::{Kind, kind::Collection}; use crate::{ @@ -739,12 +739,10 @@ impl Source { let pod_state = pod_store_w.as_reader(); let pod_cacher = MetaCache::new(); - reflectors.push(tokio::spawn(custom_reflector( - pod_store_w, - pod_cacher, - pod_watcher, - delay_deletion, - ).in_current_span())); + reflectors.push(tokio::spawn( + custom_reflector(pod_store_w, pod_cacher, pod_watcher, delay_deletion) + .in_current_span(), + )); // ----------------------------------------------------------------- @@ -763,12 +761,10 @@ impl Source { ) .backoff(watcher::DefaultBackoff::default()); - reflectors.push(tokio::spawn(custom_reflector( - ns_store_w, - MetaCache::new(), - ns_watcher, - delay_deletion, - ).in_current_span())); + reflectors.push(tokio::spawn( + custom_reflector(ns_store_w, MetaCache::new(), ns_watcher, delay_deletion) + .in_current_span(), + )); } // ----------------------------------------------------------------- @@ -788,12 +784,10 @@ impl Source { let node_state = node_store_w.as_reader(); let node_cacher = MetaCache::new(); - reflectors.push(tokio::spawn(custom_reflector( - node_store_w, - node_cacher, - node_watcher, - delay_deletion, - ).in_current_span())); + reflectors.push(tokio::spawn( + custom_reflector(node_store_w, node_cacher, node_watcher, delay_deletion) + .in_current_span(), + )); let paths_provider = K8sPathsProvider::new( pod_state.clone(), diff --git a/src/sources/splunk_hec/acknowledgements.rs b/src/sources/splunk_hec/acknowledgements.rs index 095ee7885149d..5f5cc58239e5d 100644 --- a/src/sources/splunk_hec/acknowledgements.rs +++ b/src/sources/splunk_hec/acknowledgements.rs @@ -9,10 +9,10 @@ use std::{ }; use futures::StreamExt; -use tracing::Instrument; use roaring::RoaringTreemap; use serde::{Deserialize, Serialize}; use tokio::time::interval; +use tracing::Instrument; use vector_lib::{ configurable::configurable_component, finalization::BatchStatusReceiver, finalizer::UnorderedFinalizer, @@ -107,18 +107,21 @@ impl IndexerAcknowledgement { let idle_task_channels = Arc::clone(&channels); if config.ack_idle_cleanup { - tokio::spawn(async move { - let mut interval = interval(Duration::from_secs(max_idle_time)); - loop { - interval.tick().await; - let mut channels = idle_task_channels.lock().await; - let now = Instant::now(); - - channels.retain(|_, channel| { - now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time - }); + tokio::spawn( + async move { + let mut interval = interval(Duration::from_secs(max_idle_time)); + loop { + interval.tick().await; + let mut channels = idle_task_channels.lock().await; + let now = Instant::now(); + + channels.retain(|_, channel| { + now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time + }); + } } - }.in_current_span()); + .in_current_span(), + ); } Self { diff --git a/src/transforms/throttle/rate_limiter.rs b/src/transforms/throttle/rate_limiter.rs index 50ab1cd35daa6..f8f2a3aec24c4 100644 --- a/src/transforms/throttle/rate_limiter.rs +++ b/src/transforms/throttle/rate_limiter.rs @@ -27,13 +27,16 @@ where let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock)); let rate_limiter_clone = Arc::clone(&rate_limiter); - let flush_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(flush_keys_interval); - loop { - interval.tick().await; - rate_limiter_clone.retain_recent(); + let flush_handle = tokio::spawn( + async move { + let mut interval = tokio::time::interval(flush_keys_interval); + loop { + interval.tick().await; + rate_limiter_clone.retain_recent(); + } } - }.in_current_span()); + .in_current_span(), + ); Self { rate_limiter, From 9552bf82d1b396bf9caac3db93373dbd379a8056 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Mon, 1 Jun 2026 15:14:38 +0200 Subject: [PATCH 3/9] add a helper function --- src/api/grpc/service.rs | 36 +++++----- src/api/grpc_server.rs | 80 ++++++++++------------ src/gcp.rs | 3 +- src/lib.rs | 16 +++++ src/secrets/exec.rs | 3 +- src/sinks/blackhole/sink.rs | 46 ++++++------- src/sinks/datadog/traces/config.rs | 18 ++--- src/sinks/mqtt/sink.rs | 32 ++++----- src/sinks/prometheus/exporter.rs | 27 ++++---- src/sinks/redis/sink.rs | 22 +++--- src/sinks/splunk_hec/common/service.rs | 16 ++--- src/sinks/util/sink.rs | 2 +- src/sinks/websocket_server/sink.rs | 49 ++++++------- src/sources/aws_s3/sqs.rs | 3 +- src/sources/aws_sqs/source.rs | 33 ++++----- src/sources/docker_logs/mod.rs | 56 +++++++-------- src/sources/exec/mod.rs | 46 ++++++------- src/sources/file.rs | 15 ++-- src/sources/gcp_pubsub.rs | 3 +- src/sources/journald.rs | 16 ++--- src/sources/kafka.rs | 23 +++---- src/sources/kubernetes_logs/mod.rs | 16 ++--- src/sources/postgresql_metrics.rs | 5 +- src/sources/splunk_hec/acknowledgements.rs | 30 ++++---- src/sources/util/framestream.rs | 25 +++---- src/sources/windows_event_log/mod.rs | 10 ++- src/topology/builder.rs | 4 +- src/transforms/throttle/rate_limiter.rs | 17 ++--- 28 files changed, 291 insertions(+), 361 deletions(-) diff --git a/src/api/grpc/service.rs b/src/api/grpc/service.rs index dff3c5c812975..e1e0808619db9 100644 --- a/src/api/grpc/service.rs +++ b/src/api/grpc/service.rs @@ -17,7 +17,6 @@ use tokio_stream::{ wrappers::{IntervalStream, ReceiverStream}, }; use tonic::{Request, Response, Status}; -use tracing::Instrument; use vector_lib::tap::{ controller::{TapController, TapPatterns, TapPayload}, topology::WatchRx, @@ -677,30 +676,27 @@ impl observability::Service for ObservabilityService { let watch_rx = self.watch_rx.clone(); - tokio::spawn( - async move { - let _tap_controller = TapController::new(watch_rx, tap_tx, patterns); - let mut tap_rx = ReceiverStream::new(tap_rx); - let mut interval = time::interval(time::Duration::from_millis(interval_ms)); - let mut reservoir = Reservoir::new(limit); - - loop { - select! { - Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => { - if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() { - break; - } + crate::spawn_in_current_span(async move { + let _tap_controller = TapController::new(watch_rx, tap_tx, patterns); + let mut tap_rx = ReceiverStream::new(tap_rx); + let mut interval = time::interval(time::Duration::from_millis(interval_ms)); + let mut reservoir = Reservoir::new(limit); + + loop { + select! { + Some(tap_payload) = tokio_stream::StreamExt::next(&mut tap_rx) => { + if reservoir.handle_payload(tap_payload, &event_tx).await.is_err() { + break; } - _ = interval.tick() => { - if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() { - break; - } + } + _ = interval.tick() => { + if event_tx.is_closed() || reservoir.flush(&event_tx).await.is_err() { + break; } } } } - .in_current_span(), - ); + }); let stream = FuturesStreamExt::flat_map(ReceiverStream::new(event_rx), |events| { stream::iter(events.into_iter().map(Ok)) diff --git a/src/api/grpc_server.rs b/src/api/grpc_server.rs index d3cb450c019c5..f582b2f82981c 100644 --- a/src/api/grpc_server.rs +++ b/src/api/grpc_server.rs @@ -17,7 +17,6 @@ use axum::{ use tokio::sync::oneshot; use tonic::transport::Server as TonicServer; use tonic_health::server::{HealthReporter, health_reporter}; -use tracing::Instrument; use vector_lib::tap::topology::WatchRx; use super::grpc::ObservabilityService; @@ -81,48 +80,45 @@ impl GrpcServer { let router_serving = Arc::clone(&serving); // Spawn the server with the already-bound listener - tokio::spawn( - async move { - // Build reflection service for tools like grpcurl - let reflection_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set( - crate::proto::observability::FILE_DESCRIPTOR_SET, - ) - .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET) - .build() - .expect("Failed to build reflection service"); - - // Build the tonic router (gRPC services) and merge with the HTTP router - // so both protocols share the same port. `accept_http1(true)` lets plain - // HTTP/1.1 requests reach the merged axum routes. - let router = TonicServer::builder() - .accept_http1(true) - .add_service(health_service) - .add_service(ObservabilityServer::new(service)) - .add_service(reflection_service) - .into_router() - .merge(http_router(router_serving)); - - let result = hyper::Server::from_tcp(std_listener) - .expect("Failed to build HTTP server from TCP listener") - .serve(router.into_make_service()) - .with_graceful_shutdown(async { - rx.await.ok(); - info!("GRPC API server shutting down."); - }) - .await; - - if let Err(e) = result { - error!( - message = "GRPC server encountered an error.", - error = %e, - error_source = ?e.source(), - bind_addr = %actual_addr, - ); - } + crate::spawn_in_current_span(async move { + // Build reflection service for tools like grpcurl + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set( + crate::proto::observability::FILE_DESCRIPTOR_SET, + ) + .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET) + .build() + .expect("Failed to build reflection service"); + + // Build the tonic router (gRPC services) and merge with the HTTP router + // so both protocols share the same port. `accept_http1(true)` lets plain + // HTTP/1.1 requests reach the merged axum routes. + let router = TonicServer::builder() + .accept_http1(true) + .add_service(health_service) + .add_service(ObservabilityServer::new(service)) + .add_service(reflection_service) + .into_router() + .merge(http_router(router_serving)); + + let result = hyper::Server::from_tcp(std_listener) + .expect("Failed to build HTTP server from TCP listener") + .serve(router.into_make_service()) + .with_graceful_shutdown(async { + rx.await.ok(); + info!("GRPC API server shutting down."); + }) + .await; + + if let Err(e) = result { + error!( + message = "GRPC server encountered an error.", + error = %e, + error_source = ?e.source(), + bind_addr = %actual_addr, + ); } - .in_current_span(), - ); + }); info!("GRPC API server started on {}.", actual_addr); diff --git a/src/gcp.rs b/src/gcp.rs index fe44ca4455cde..89fec86770e0b 100644 --- a/src/gcp.rs +++ b/src/gcp.rs @@ -17,7 +17,6 @@ use hyper::header::AUTHORIZATION; use smpl_jwt::Jwt; use snafu::{ResultExt, Snafu}; use tokio::sync::watch; -use tracing::Instrument; use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString}; use crate::{ @@ -195,7 +194,7 @@ impl GcpAuthenticator { pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> { let (sender, receiver) = watch::channel(()); - tokio::spawn(self.clone().token_regenerator(sender).in_current_span()); + crate::spawn_in_current_span(self.clone().token_regenerator(sender)); receiver } diff --git a/src/lib.rs b/src/lib.rs index 8d26a3b080ecf..2ff018957f2cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -241,6 +241,22 @@ pub fn get_hostname() -> std::io::Result { }) } +/// Spawn a future on the current tokio runtime, propagating the current tracing span into the +/// spawned task. This ensures that any logs or internal metrics emitted by the task retain the +/// component tags (component_id, component_kind, component_type) of the caller. +/// +/// Prefer this over `tokio::spawn(future.in_current_span())` to keep call sites concise. +#[track_caller] +pub(crate) fn spawn_in_current_span( + task: impl std::future::Future + Send + 'static, +) -> tokio::task::JoinHandle +where + T: Send + 'static, +{ + use tracing::Instrument as _; + tokio::spawn(task.in_current_span()) +} + /// Spawn a task with the given name. The name is only used if /// built with [`tokio_unstable`][tokio_unstable]. /// diff --git a/src/secrets/exec.rs b/src/secrets/exec.rs index 7ad0546076ec0..2a6e8c464065f 100644 --- a/src/secrets/exec.rs +++ b/src/secrets/exec.rs @@ -5,7 +5,6 @@ use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command, time}; use tokio_util::codec; -use tracing::Instrument; use vector_lib::configurable::{component::GenerateConfig, configurable_component}; use vrl::value::Value; @@ -180,7 +179,7 @@ async fn query_backend( .ok_or("unable to acquire stdout")?; let query = serde_json::to_vec(&query)?; - tokio::spawn(async move { stdin.write_all(&query).await }.in_current_span()); + crate::spawn_in_current_span(async move { stdin.write_all(&query).await }); let timeout = time::sleep(time::Duration::from_secs(timeout)); tokio::pin!(timeout); diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index ca5d941b114cf..fcb19a36d272b 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -20,7 +20,6 @@ use vector_lib::{ }, }; -use tracing::Instrument; use crate::{ event::{EventArray, EventContainer, EventStatus, Finalizable}, @@ -59,32 +58,29 @@ impl StreamSink for BlackholeSink { if self.config.print_interval_secs.as_secs() > 0 { let interval_dur = self.config.print_interval_secs; - tokio::spawn( - async move { - let mut print_interval = interval(interval_dur); - loop { - select! { - _ = print_interval.tick() => { - info!( - events = total_events.load(Ordering::Relaxed), - raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), - internal_log_rate_limit = false, - "Collected events." - ); - }, - _ = tripwire.changed() => break, - } + crate::spawn_in_current_span(async move { + let mut print_interval = interval(interval_dur); + loop { + select! { + _ = print_interval.tick() => { + info!( + events = total_events.load(Ordering::Relaxed), + raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), + internal_log_rate_limit = false, + "Collected events." + ); + }, + _ = tripwire.changed() => break, } - - info!( - events = total_events.load(Ordering::Relaxed), - raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), - internal_log_rate_limit = false, - "Collected events." - ); } - .in_current_span(), - ); + + info!( + events = total_events.load(Ordering::Relaxed), + raw_bytes_collected = total_raw_bytes.load(Ordering::Relaxed), + internal_log_rate_limit = false, + "Collected events." + ); + }); } while let Some(mut events) = input.next().await { diff --git a/src/sinks/datadog/traces/config.rs b/src/sinks/datadog/traces/config.rs index c5ce5ffb9d134..00682514dadc1 100644 --- a/src/sinks/datadog/traces/config.rs +++ b/src/sinks/datadog/traces/config.rs @@ -5,7 +5,6 @@ use indoc::indoc; use snafu::ResultExt; use tokio::sync::oneshot::{Sender, channel}; use tower::ServiceBuilder; -use tracing::Instrument; use vector_lib::{ config::{AcknowledgementsConfig, proxy::ProxyConfig}, configurable::configurable_component, @@ -179,16 +178,13 @@ impl DatadogTracesConfig { // Send the APM stats payloads independently of the sink framework. // This is necessary to comply with what the APM stats backend of Datadog expects with // respect to receiving stats payloads. - tokio::spawn( - flush_apm_stats_thread( - tripwire, - client, - compression, - endpoints, - Arc::clone(&apm_stats_aggregator), - ) - .in_current_span(), - ); + crate::spawn_in_current_span(flush_apm_stats_thread( + tripwire, + client, + compression, + endpoints, + Arc::clone(&apm_stats_aggregator), + )); Ok(VectorSink::from_event_streamsink(sink)) } diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs index 98a5dab2d1cba..e9feb7bb0144e 100644 --- a/src/sinks/mqtt/sink.rs +++ b/src/sinks/mqtt/sink.rs @@ -1,6 +1,5 @@ use async_trait::async_trait; use futures::{StreamExt, stream::BoxStream}; -use tracing::Instrument; use super::{ MqttSinkConfig, @@ -61,26 +60,23 @@ impl MqttSink { let (client, mut connection) = self.connector.connect(); // This is necessary to keep the mqtt event loop moving forward. - tokio::spawn( - async move { - loop { - // If an error is returned here there is currently no way to tie this back - // to the event that was posted which means we can't accurately provide - // delivery guarantees. - // We need this issue resolved first: - // https://github.com/bytebeamio/rumqtt/issues/349 - match connection.poll().await { - Ok(_) => {} - Err(connection_error) => { - emit!(MqttConnectionError { - error: connection_error - }); - } + crate::spawn_in_current_span(async move { + loop { + // If an error is returned here there is currently no way to tie this back + // to the event that was posted which means we can't accurately provide + // delivery guarantees. + // We need this issue resolved first: + // https://github.com/bytebeamio/rumqtt/issues/349 + match connection.poll().await { + Ok(_) => {} + Err(connection_error) => { + emit!(MqttConnectionError { + error: connection_error + }); } } } - .in_current_span(), - ); + }); let service = ServiceBuilder::new().service(MqttService { client, diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 2c0063bd7e1f7..8ed514b638fa0 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -466,21 +466,18 @@ impl PrometheusExporter { let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?; let listener = tls.bind(&address).await?; - tokio::spawn( - async move { - info!(message = "Building HTTP server.", address = %address); - - Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) - .serve(new_service) - .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler)) - .instrument(span) - .await - .map_err(|error| error!("Server error: {}.", error))?; - - Ok::<(), ()>(()) - } - .in_current_span(), - ); + crate::spawn_in_current_span(async move { + info!(message = "Building HTTP server.", address = %address); + + Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + .serve(new_service) + .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler)) + .instrument(span) + .await + .map_err(|error| error!("Server error: {}.", error))?; + + Ok::<(), ()>(()) + }); self.server_shutdown_trigger = Some(trigger); Ok(()) diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index a9f8667b8319a..b3719d01e514a 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -11,7 +11,6 @@ use tokio::{ task::JoinHandle, time::sleep, }; -use tracing::Instrument; use super::{ RedisEvent, RedisRequest, RepairChannelSnafu, @@ -140,18 +139,15 @@ impl RedisConnection { Ok(Self::Sentinel { connection_send: conn_tx, connection_recv: conn_rx, - repair_task: Arc::new(tokio::spawn( - async move { - Self::repair_connection_manager_task( - sentinel, - service_name, - node_connection_info, - task_conn_tx, - ) - .await - } - .in_current_span(), - )), + repair_task: Arc::new(crate::spawn_in_current_span(async move { + Self::repair_connection_manager_task( + sentinel, + service_name, + node_connection_info, + task_conn_tx, + ) + .await + })), }) } diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index f4f6d3ff7d760..3b1c68c876561 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -12,7 +12,6 @@ use snafu::ResultExt; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; use tokio_util::sync::PollSemaphore; use tower::Service; -use tracing::Instrument; use uuid::Uuid; use vector_lib::{event::EventStatus, request_metadata::MetaDescriptive}; @@ -59,15 +58,12 @@ where let max_pending_acks = indexer_acknowledgements.max_pending_acks.get(); let tx = if let Some(ack_client) = ack_client { let (tx, rx) = mpsc::channel(128); - tokio::spawn( - run_acknowledgements( - rx, - ack_client, - Arc::clone(&http_request_builder), - indexer_acknowledgements, - ) - .in_current_span(), - ); + crate::spawn_in_current_span(run_acknowledgements( + rx, + ack_client, + Arc::clone(&http_request_builder), + indexer_acknowledgements, + )); Some(tx) } else { None diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index 150d512425f64..e9c35c94f3e70 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -323,7 +323,7 @@ where this.lingers.remove(partition); let batch = batch.finish(); - let future = tokio::spawn(this.service.call(batch).in_current_span()); + let future = crate::spawn_in_current_span(this.service.call(batch)); if let Some(map) = this.in_flight.as_mut() { map.insert(partition.clone(), future.map(|_| ()).fuse().boxed()); diff --git a/src/sinks/websocket_server/sink.rs b/src/sinks/websocket_server/sink.rs index 77da730fbbdda..ea38d88fdac7f 100644 --- a/src/sinks/websocket_server/sink.rs +++ b/src/sinks/websocket_server/sink.rs @@ -37,7 +37,6 @@ use tokio_tungstenite::tungstenite::{ handshake::server::{ErrorResponse, Request, Response}, }; use tokio_util::codec::Encoder as _; -use tracing::Instrument; use url::Url; use uuid::Uuid; use vector_lib::{ @@ -129,20 +128,17 @@ impl WebSocketListenerSink { let open_gauge = OpenGauge::new(); while let Ok(stream) = listener.accept().await { - tokio::spawn( - Self::handle_connection( - auth.clone(), - message_buffering.clone(), - subprotocol.clone(), - Arc::clone(&peers), - Arc::clone(&client_checkpoints), - Arc::clone(&buffer), - stream, - extra_tags_config.clone(), - open_gauge.clone(), - ) - .in_current_span(), - ); + crate::spawn_in_current_span(Self::handle_connection( + auth.clone(), + message_buffering.clone(), + subprotocol.clone(), + Arc::clone(&peers), + Arc::clone(&client_checkpoints), + Arc::clone(&buffer), + stream, + extra_tags_config.clone(), + open_gauge.clone(), + )); } } @@ -360,19 +356,16 @@ impl StreamSink for WebSocketListenerSink { ))); let client_checkpoints = Arc::new(Mutex::new(HashMap::default())); - tokio::spawn( - Self::handle_connections( - self.auth, - self.message_buffering.clone(), - self.subprotocol.clone(), - Arc::clone(&peers), - self.extra_tags_config, - Arc::clone(&client_checkpoints), - Arc::clone(&message_buffer), - listener, - ) - .in_current_span(), - ); + crate::spawn_in_current_span(Self::handle_connections( + self.auth, + self.message_buffering.clone(), + self.subprotocol.clone(), + Arc::clone(&peers), + self.extra_tags_config, + Arc::clone(&client_checkpoints), + Arc::clone(&message_buffer), + listener, + )); while input.as_mut().peek().await.is_some() { let mut event = input.next().await.unwrap(); diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 9ad5b47be61cc..802bdf5d38f0e 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -28,7 +28,6 @@ use smallvec::SmallVec; use snafu::{ResultExt, Snafu}; use tokio::{pin, select}; use tokio_util::codec::FramedRead; -use tracing::Instrument; use vector_lib::{ codecs::decoding::FramingError, config::{LegacyKey, LogNamespace, log_schema}, @@ -358,7 +357,7 @@ impl Ingestor { acknowledgements, ); let fut = process.run(); - let handle = tokio::spawn(fut.in_current_span()); + let handle = crate::spawn_in_current_span(fut); handles.push(handle); } diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index f8afdb9365ca9..7dc7c01e735e0 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -7,7 +7,6 @@ use aws_sdk_sqs::{ use chrono::{DateTime, TimeZone, Utc}; use futures::{FutureExt, StreamExt}; use tokio::{pin, select}; -use tracing_futures::Instrument; use vector_lib::{ config::LogNamespace, finalizer::UnorderedFinalizer, @@ -50,16 +49,13 @@ impl SqsSource { let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone())); let client = self.client.clone(); let queue_url = self.queue_url.clone(); - tokio::spawn( - async move { - while let Some((status, receipts)) = ack_stream.next().await { - if status == BatchStatus::Delivered { - delete_messages(client.clone(), receipts, queue_url.clone()).await; - } + crate::spawn_in_current_span(async move { + while let Some((status, receipts)) = ack_stream.next().await { + if status == BatchStatus::Delivered { + delete_messages(client.clone(), receipts, queue_url.clone()).await; } } - .in_current_span(), - ); + }); Arc::new(finalizer) }); let events_received = register!(EventsReceived); @@ -70,19 +66,16 @@ impl SqsSource { let mut out = out.clone(); let finalizer = finalizer.clone(); let events_received = events_received.clone(); - task_handles.push(tokio::spawn( - async move { - let finalizer = finalizer.as_ref(); - pin!(shutdown); - loop { - select! { - _ = &mut shutdown => break, - _ = source.run_once(&mut out, finalizer, events_received.clone()) => {}, - } + task_handles.push(crate::spawn_in_current_span(async move { + let finalizer = finalizer.as_ref(); + pin!(shutdown); + loop { + select! { + _ = &mut shutdown => break, + _ = source.run_once(&mut out, finalizer, events_received.clone()) => {}, } } - .in_current_span(), - )); + })); } // Wait for all of the processes to finish. If any one of them panics, we resume diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 45268788d45cc..8c3ef9bac102f 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -22,7 +22,6 @@ use chrono::{DateTime, FixedOffset, Local, ParseError, Utc}; use futures::{Stream, StreamExt}; use serde_with::serde_as; use tokio::sync::mpsc; -use tracing_futures::Instrument; use vector_lib::{ codecs::{BytesDeserializer, BytesDeserializerConfig}, config::{LegacyKey, LogNamespace}, @@ -740,39 +739,36 @@ impl EventStreamBuilder { /// Spawn a task to runs event stream until shutdown. fn start(&self, id: ContainerId, backoff: Option) -> ContainerState { let this = self.clone(); - tokio::spawn( - async move { - if let Some(duration) = backoff { - tokio::time::sleep(duration).await; - } + crate::spawn_in_current_span(async move { + if let Some(duration) = backoff { + tokio::time::sleep(duration).await; + } - match this - .core - .docker - .inspect_container(id.as_str(), None::) - .await - { - Ok(details) => match ContainerMetadata::from_details(details) { - Ok(metadata) => { - let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp); - this.run_event_stream(info).await; - return; - } - Err(error) => emit!(DockerLogsTimestampParseError { - error, - container_id: id.as_str() - }), - }, - Err(error) => emit!(DockerLogsContainerMetadataFetchError { + match this + .core + .docker + .inspect_container(id.as_str(), None::) + .await + { + Ok(details) => match ContainerMetadata::from_details(details) { + Ok(metadata) => { + let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp); + this.run_event_stream(info).await; + return; + } + Err(error) => emit!(DockerLogsTimestampParseError { error, container_id: id.as_str() }), - } - - this.finish(Err((id, ErrorPersistence::Transient))); + }, + Err(error) => emit!(DockerLogsContainerMetadataFetchError { + error, + container_id: id.as_str() + }), } - .in_current_span(), - ); + + this.finish(Err((id, ErrorPersistence::Transient))); + }); ContainerState::new_running() } @@ -781,7 +777,7 @@ impl EventStreamBuilder { fn restart(&self, container: &mut ContainerState) { if let Some(info) = container.take_info() { let this = self.clone(); - tokio::spawn(this.run_event_stream(info).in_current_span()); + crate::spawn_in_current_span(this.run_event_stream(info)); } } diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 163546f9ccc60..48e3c14454d11 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -11,7 +11,6 @@ use tokio::{ time::{self, Duration, Instant, sleep}, }; use tokio_stream::wrappers::IntervalStream; -use tracing::Instrument; use vector_lib::{ EstimatedJsonEncodedSizeOf, codecs::{ @@ -725,33 +724,30 @@ fn spawn_reader_thread( sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>, ) { // Start the green background thread for collecting - drop(tokio::spawn( - async move { - debug!("Start capturing {} command output.", origin); - - let mut stream = DecoderFramedRead::new(reader, decoder); - while let Some(result) = stream.next().await { - match result { - Ok(next) => { - if sender.send((next, origin)).await.is_err() { - // If the receive half of the channel is closed, either due to close being - // called or the Receiver handle dropping, the function returns an error. - emit!(ExecChannelClosedError); - break; - } + drop(crate::spawn_in_current_span(async move { + debug!("Start capturing {} command output.", origin); + + let mut stream = DecoderFramedRead::new(reader, decoder); + while let Some(result) = stream.next().await { + match result { + Ok(next) => { + if sender.send((next, origin)).await.is_err() { + // If the receive half of the channel is closed, either due to close being + // called or the Receiver handle dropping, the function returns an error. + emit!(ExecChannelClosedError); + break; } - Err(error) => { - // Error is logged by `vector_lib::codecs::Decoder`, no further - // handling is needed here. - if !error.can_continue() { - break; - } + } + Err(error) => { + // Error is logged by `vector_lib::codecs::Decoder`, no further + // handling is needed here. + if !error.can_continue() { + break; } } } - - debug!("Finished capturing {} command output.", origin); } - .in_current_span(), - )); + + debug!("Finished capturing {} command output.", origin); + })); } diff --git a/src/sources/file.rs b/src/sources/file.rs index 82b19261c8bd4..d485c6ad6b4b6 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -580,17 +580,14 @@ pub fn file_source( // checkpoints until all the acks have come in. let (send_shutdown, shutdown2) = oneshot::channel::<()>(); let checkpoints = checkpointer.view(); - tokio::spawn( - async move { - while let Some((status, entry)) = ack_stream.next().await { - if status == BatchStatus::Delivered { - checkpoints.update(entry.file_id, entry.offset); - } + crate::spawn_in_current_span(async move { + while let Some((status, entry)) = ack_stream.next().await { + if status == BatchStatus::Delivered { + checkpoints.update(entry.file_id, entry.offset); } - send_shutdown.send(()) } - .in_current_span(), - ); + send_shutdown.send(()) + }); (Some(finalizer), shutdown2.map(|_| ()).boxed()) } else { // When not dealing with end-to-end acknowledgements, just diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 87127b2e20f7f..ddf96d19acf70 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -23,7 +23,6 @@ use tonic::{ metadata::MetadataValue, transport::{Certificate, ClientTlsConfig, Endpoint, Identity}, }; -use tracing::Instrument; use vector_lib::{ byte_size_of::ByteSizeOf, codecs::decoding::{DeserializerConfig, FramingConfig}, @@ -454,7 +453,7 @@ impl PubsubSource { // when it has an idle interval it will mark itself as not // busy. let busy_flag = Arc::new(AtomicBool::new(false)); - let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag)).in_current_span()); + let task = crate::spawn_in_current_span(self.clone().run(Arc::clone(&busy_flag))); tasks.push(Task { task, busy_flag }); } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 5038b0a75f44a..177ee5e91663c 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -25,7 +25,6 @@ use tokio::{ time::sleep, }; use tokio_util::codec::FramedRead; -use tracing::Instrument; use vector_lib::{ EstimatedJsonEncodedSizeOf, codecs::{CharacterDelimitedDecoder, decoding::BoxedFramingError}, @@ -520,7 +519,7 @@ impl JournaldSource { let events_received = register!(EventsReceived); // Spawn stderr handler task - let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream).in_current_span()); + let stderr_handler = crate::spawn_in_current_span(Self::handle_stderr(stderr_stream)); let batch_size = self.batch_size; let result = loop { @@ -1057,16 +1056,13 @@ impl Finalizer { ) -> Self { if acknowledgements { let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown)); - tokio::spawn( - async move { - while let Some((status, cursor)) = ack_stream.next().await { - if status == BatchStatus::Delivered { - checkpointer.lock().await.set(cursor).await; - } + crate::spawn_in_current_span(async move { + while let Some((status, cursor)) = ack_stream.next().await { + if status == BatchStatus::Delivered { + checkpointer.lock().await.set(cursor).await; } } - .in_current_span(), - ); + }); Self::Async(finalizer) } else { Self::Sync(checkpointer) diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index cf379cafbd183..f5927b365157b 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -455,19 +455,16 @@ async fn kafka_source( .map_or(config.session_timeout_ms / 2, Duration::from_millis); let consumer_state = ConsumerStateInner::::new(config, decoder, out, log_namespace, span); - tokio::spawn( - async move { - coordinate_kafka_callbacks( - consumer, - callback_rx, - consumer_state, - drain_timeout_ms, - eof_tx, - ) - .await; - } - .in_current_span(), - ) + crate::spawn_in_current_span(async move { + coordinate_kafka_callbacks( + consumer, + callback_rx, + consumer_state, + drain_timeout_ms, + eof_tx, + ) + .await; + }) }; let client_task = { diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index 3482a93b86870..af2bf45d158b9 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -21,7 +21,6 @@ use kube::{ }; use lifecycle::Lifecycle; use serde_with::serde_as; -use tracing::Instrument; use vector_lib::{ EstimatedJsonEncodedSizeOf, TimeZone, codecs::{BytesDeserializer, BytesDeserializerConfig}, @@ -739,9 +738,8 @@ impl Source { let pod_state = pod_store_w.as_reader(); let pod_cacher = MetaCache::new(); - reflectors.push(tokio::spawn( - custom_reflector(pod_store_w, pod_cacher, pod_watcher, delay_deletion) - .in_current_span(), + reflectors.push(crate::spawn_in_current_span( + custom_reflector(pod_store_w, pod_cacher, pod_watcher, delay_deletion), )); // ----------------------------------------------------------------- @@ -761,9 +759,8 @@ impl Source { ) .backoff(watcher::DefaultBackoff::default()); - reflectors.push(tokio::spawn( - custom_reflector(ns_store_w, MetaCache::new(), ns_watcher, delay_deletion) - .in_current_span(), + reflectors.push(crate::spawn_in_current_span( + custom_reflector(ns_store_w, MetaCache::new(), ns_watcher, delay_deletion), )); } @@ -784,9 +781,8 @@ impl Source { let node_state = node_store_w.as_reader(); let node_cacher = MetaCache::new(); - reflectors.push(tokio::spawn( - custom_reflector(node_store_w, node_cacher, node_watcher, delay_deletion) - .in_current_span(), + reflectors.push(crate::spawn_in_current_span( + custom_reflector(node_store_w, node_cacher, node_watcher, delay_deletion), )); let paths_provider = K8sPathsProvider::new( diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index 471eeed6817a3..cb0f820613306 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -34,7 +34,6 @@ use vector_lib::{ metric_tags, }; -use tracing::Instrument as _; use crate::{ config::{SourceConfig, SourceContext, SourceOutput}, @@ -297,7 +296,7 @@ impl PostgresqlClient { endpoint: &self.endpoint, } })?; - tokio::spawn(connection.in_current_span()); + crate::spawn_in_current_span(connection); client } None => { @@ -308,7 +307,7 @@ impl PostgresqlClient { .with_context(|_| ConnectionFailedSnafu { endpoint: &self.endpoint, })?; - tokio::spawn(connection.in_current_span()); + crate::spawn_in_current_span(connection); client } }; diff --git a/src/sources/splunk_hec/acknowledgements.rs b/src/sources/splunk_hec/acknowledgements.rs index 5f5cc58239e5d..c1688acb9945d 100644 --- a/src/sources/splunk_hec/acknowledgements.rs +++ b/src/sources/splunk_hec/acknowledgements.rs @@ -12,7 +12,6 @@ use futures::StreamExt; use roaring::RoaringTreemap; use serde::{Deserialize, Serialize}; use tokio::time::interval; -use tracing::Instrument; use vector_lib::{ configurable::configurable_component, finalization::BatchStatusReceiver, finalizer::UnorderedFinalizer, @@ -107,21 +106,18 @@ impl IndexerAcknowledgement { let idle_task_channels = Arc::clone(&channels); if config.ack_idle_cleanup { - tokio::spawn( - async move { - let mut interval = interval(Duration::from_secs(max_idle_time)); - loop { - interval.tick().await; - let mut channels = idle_task_channels.lock().await; - let now = Instant::now(); - - channels.retain(|_, channel| { - now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time - }); - } + crate::spawn_in_current_span(async move { + let mut interval = interval(Duration::from_secs(max_idle_time)); + loop { + interval.tick().await; + let mut channels = idle_task_channels.lock().await; + let now = Instant::now(); + + channels.retain(|_, channel| { + now.duration_since(channel.get_last_used()).as_secs() <= max_idle_time + }); } - .in_current_span(), - ); + }); } Self { @@ -210,7 +206,7 @@ impl Channel { let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new())); let finalizer_ack_ids_status = Arc::clone(&ack_ids_status); let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown)); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { while let Some((status, ack_id)) = ack_stream.next().await { if status == BatchStatus::Delivered { let mut ack_ids_status = finalizer_ack_ids_status.lock().unwrap(); @@ -227,7 +223,7 @@ impl Channel { } } } - }.in_current_span()); + }); Self { last_used_timestamp: RwLock::new(Instant::now()), diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index d7f7866778101..9c96b2f3fc4ef 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -908,20 +908,17 @@ async fn spawn_event_handling_tasks( ) -> JoinHandle<()> { wait_for_task_quota(&active_task_nums, max_frame_handling_tasks).await; - tokio::spawn( - async move { - future::ready({ - if let Some(evt) = event_handler.handle_event(received_from, event_data) - && event_sink.send_event(evt).await.is_err() - { - error!("Encountered error while sending event."); - } - active_task_nums.fetch_sub(1, Ordering::AcqRel); - }) - .await; - } - .in_current_span(), - ) + crate::spawn_in_current_span(async move { + future::ready({ + if let Some(evt) = event_handler.handle_event(received_from, event_data) + && event_sink.send_event(evt).await.is_err() + { + error!("Encountered error while sending event."); + } + active_task_nums.fetch_sub(1, Ordering::AcqRel); + }) + .await; + }) } async fn wait_for_task_quota(active_task_nums: &Arc, max_tasks: usize) { diff --git a/src/sources/windows_event_log/mod.rs b/src/sources/windows_event_log/mod.rs index a6a775945da66..945bc99b54fd9 100644 --- a/src/sources/windows_event_log/mod.rs +++ b/src/sources/windows_event_log/mod.rs @@ -35,8 +35,6 @@ cfg_if::cfg_if! { use windows::Win32::Foundation::{DUPLICATE_SAME_ACCESS, DuplicateHandle, HANDLE}; use windows::Win32::System::Threading::GetCurrentProcess; - use tracing::Instrument; - use crate::{ SourceSender, event::{BatchNotifier, BatchStatus, BatchStatusReceiver}, @@ -103,7 +101,7 @@ impl Finalizer { OrderedFinalizer::::new(Some(shutdown.clone())); // Spawn background task to process acknowledgments and update checkpoints - tokio::spawn(async move { + crate::spawn_in_current_span(async move { while let Some((status, entry)) = ack_stream.next().await { if status == BatchStatus::Delivered { if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await { @@ -125,7 +123,7 @@ impl Finalizer { } } debug!(message = "Acknowledgement stream completed."); - }.in_current_span()); + }); Self::Async(finalizer) } else { @@ -363,7 +361,7 @@ impl WindowsEventLogSource { } }; let shutdown_watcher = shutdown.clone(); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { shutdown_watcher.await; unsafe { let handle = @@ -373,7 +371,7 @@ impl WindowsEventLogSource { let _ = windows::Win32::Foundation::CloseHandle(handle); } } - }.in_current_span()); + }); // Track when we last flushed checkpoints let mut last_checkpoint = std::time::Instant::now(); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 6ed0206446597..9caa9e2f11378 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -1280,12 +1280,12 @@ impl Runner { let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); - let task = tokio::spawn(async move { + let task = crate::spawn_in_current_span(async move { for events in input_arrays { t.transform_all(events, &mut outputs_buf); } outputs_buf - }.in_current_span()); + }); in_flight.push_back(task); } None => { diff --git a/src/transforms/throttle/rate_limiter.rs b/src/transforms/throttle/rate_limiter.rs index f8f2a3aec24c4..39a9cb63a4a63 100644 --- a/src/transforms/throttle/rate_limiter.rs +++ b/src/transforms/throttle/rate_limiter.rs @@ -1,7 +1,5 @@ use std::{hash::Hash, sync::Arc, time::Duration}; -use tracing::Instrument; - use governor::{ Quota, RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore, }; @@ -27,16 +25,13 @@ where let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock)); let rate_limiter_clone = Arc::clone(&rate_limiter); - let flush_handle = tokio::spawn( - async move { - let mut interval = tokio::time::interval(flush_keys_interval); - loop { - interval.tick().await; - rate_limiter_clone.retain_recent(); - } + let flush_handle = crate::spawn_in_current_span(async move { + let mut interval = tokio::time::interval(flush_keys_interval); + loop { + interval.tick().await; + rate_limiter_clone.retain_recent(); } - .in_current_span(), - ); + }); Self { rate_limiter, From 58a4238abe281751ae11e215079ed8337f5023d3 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Mon, 1 Jun 2026 15:24:51 +0200 Subject: [PATCH 4/9] fix fmt --- src/sinks/blackhole/sink.rs | 1 - src/sources/kubernetes_logs/mod.rs | 27 ++++++++++++++++++--------- src/sources/postgresql_metrics.rs | 1 - 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index fcb19a36d272b..8796c7425f691 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -20,7 +20,6 @@ use vector_lib::{ }, }; - use crate::{ event::{EventArray, EventContainer, EventStatus, Finalizable}, sinks::{blackhole::config::BlackholeConfig, util::StreamSink}, diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index af2bf45d158b9..04b26b4aa7c09 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -738,9 +738,12 @@ impl Source { let pod_state = pod_store_w.as_reader(); let pod_cacher = MetaCache::new(); - reflectors.push(crate::spawn_in_current_span( - custom_reflector(pod_store_w, pod_cacher, pod_watcher, delay_deletion), - )); + reflectors.push(crate::spawn_in_current_span(custom_reflector( + pod_store_w, + pod_cacher, + pod_watcher, + delay_deletion, + ))); // ----------------------------------------------------------------- @@ -759,9 +762,12 @@ impl Source { ) .backoff(watcher::DefaultBackoff::default()); - reflectors.push(crate::spawn_in_current_span( - custom_reflector(ns_store_w, MetaCache::new(), ns_watcher, delay_deletion), - )); + reflectors.push(crate::spawn_in_current_span(custom_reflector( + ns_store_w, + MetaCache::new(), + ns_watcher, + delay_deletion, + ))); } // ----------------------------------------------------------------- @@ -781,9 +787,12 @@ impl Source { let node_state = node_store_w.as_reader(); let node_cacher = MetaCache::new(); - reflectors.push(crate::spawn_in_current_span( - custom_reflector(node_store_w, node_cacher, node_watcher, delay_deletion), - )); + reflectors.push(crate::spawn_in_current_span(custom_reflector( + node_store_w, + node_cacher, + node_watcher, + delay_deletion, + ))); let paths_provider = K8sPathsProvider::new( pod_state.clone(), diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index cb0f820613306..2854c50496174 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -34,7 +34,6 @@ use vector_lib::{ metric_tags, }; - use crate::{ config::{SourceConfig, SourceContext, SourceOutput}, event::metric::{Metric, MetricKind, MetricTags, MetricValue}, From 1447dd682ad4a6e45a3f4bad706fd3f01ce30e2c Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Mon, 1 Jun 2026 19:08:13 +0200 Subject: [PATCH 5/9] move helper to vector_common --- lib/file-source/src/file_server.rs | 17 +++++------- .../src/variants/disk_v2/ledger.rs | 14 ++++------ lib/vector-common/src/lib.rs | 16 +++++++++++ lib/vector-lib/src/lib.rs | 3 ++- lib/vector-tap/src/controller.rs | 27 +++++++++---------- src/lib.rs | 16 +---------- 6 files changed, 43 insertions(+), 50 deletions(-) diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 8078aa32b0575..17d80cf1586a4 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -24,7 +24,7 @@ use tokio::{ time::sleep, }; -use tracing::{Instrument, debug, error, info, trace}; +use tracing::{debug, error, info, trace}; use crate::{ file_watcher::{FileWatcher, RawLineResult}, @@ -148,15 +148,12 @@ where let mut stats = TimingStats::default(); // Spawn the checkpoint writer task - let checkpoint_task_handle = tokio::spawn( - checkpoint_writer( - checkpointer, - self.glob_minimum_cooldown, - shutdown_checkpointer, - self.emitter.clone(), - ) - .in_current_span(), - ); + let checkpoint_task_handle = vector_common::spawn_in_current_span(checkpoint_writer( + checkpointer, + self.glob_minimum_cooldown, + shutdown_checkpointer, + self.emitter.clone(), + )); // Alright friends, how does this work? // diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index 8d11fc6de6e33..a2d1c68f8d8e5 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -16,7 +16,6 @@ use futures::StreamExt; use rkyv::{Archive, Serialize, with::Atomic}; use snafu::{ResultExt, Snafu}; use tokio::{fs, io::AsyncWriteExt, sync::Notify}; -use tracing::Instrument; use vector_common::finalizer::OrderedFinalizer; use super::{ @@ -701,15 +700,12 @@ where #[must_use] pub(super) fn spawn_finalizer(self: Arc) -> OrderedFinalizer { let (finalizer, mut stream) = OrderedFinalizer::new(None); - tokio::spawn( - async move { - while let Some((_status, amount)) = stream.next().await { - self.increment_pending_acks(amount); - self.notify_writer_waiters(); - } + vector_common::spawn_in_current_span(async move { + while let Some((_status, amount)) = stream.next().await { + self.increment_pending_acks(amount); + self.notify_writer_waiters(); } - .in_current_span(), - ); + }); finalizer } } diff --git a/lib/vector-common/src/lib.rs b/lib/vector-common/src/lib.rs index 75849aac75884..cda7e4b921242 100644 --- a/lib/vector-common/src/lib.rs +++ b/lib/vector-common/src/lib.rs @@ -130,3 +130,19 @@ pub type Error = Box; /// Vector's basic result type, defined in terms of [`Error`] and generic over /// `T`. pub type Result = std::result::Result; + +/// Spawn a future on the current tokio runtime, propagating the current tracing span into the +/// spawned task. This ensures that any logs or internal metrics emitted by the task retain the +/// component tags (`component_id`, `component_kind`, `component_type`) of the caller. +/// +/// Prefer this over `tokio::spawn(future.in_current_span())` to keep call sites concise. +#[track_caller] +pub fn spawn_in_current_span( + task: impl std::future::Future + Send + 'static, +) -> tokio::task::JoinHandle +where + T: Send + 'static, +{ + use tracing::Instrument as _; + tokio::spawn(task.in_current_span()) +} diff --git a/lib/vector-lib/src/lib.rs b/lib/vector-lib/src/lib.rs index ae62fd9053438..1247085a058a3 100644 --- a/lib/vector-lib/src/lib.rs +++ b/lib/vector-lib/src/lib.rs @@ -13,7 +13,8 @@ pub use vector_common::{ Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, atomic, btreemap, byte_size_of, byte_size_of::ByteSizeOf, conversion, counter, encode_logfmt, finalization, finalizer, gauge, histogram, id, impl_event_data_eq, internal_event, json_size, - registered_event, request_metadata, sensitive_string, shutdown, stats, trigger, + registered_event, request_metadata, sensitive_string, shutdown, spawn_in_current_span, stats, + trigger, }; pub use vector_config as configurable; pub use vector_config::impl_generate_config_from_default; diff --git a/lib/vector-tap/src/controller.rs b/lib/vector-tap/src/controller.rs index 7aa1b556a6776..f0b19ecde5a72 100644 --- a/lib/vector-tap/src/controller.rs +++ b/lib/vector-tap/src/controller.rs @@ -214,20 +214,17 @@ impl TapController { fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - tokio::spawn( - async move { - _ = shutdown_rx.await; - if control_tx - .send(fanout::ControlMessage::Remove(sink_id.clone())) - .is_err() - { - debug!(message = "Couldn't disconnect sink.", ?sink_id); - } else { - debug!(message = "Disconnected sink.", ?sink_id); - } + vector_common::spawn_in_current_span(async move { + _ = shutdown_rx.await; + if control_tx + .send(fanout::ControlMessage::Remove(sink_id.clone())) + .is_err() + { + debug!(message = "Couldn't disconnect sink.", ?sink_id); + } else { + debug!(message = "Disconnected sink.", ?sink_id); } - .in_current_span(), - ); + }); shutdown_tx } @@ -369,11 +366,11 @@ async fn tap_handler( ); let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone()); - tokio::spawn(async move { + vector_common::spawn_in_current_span(async move { while let Some(events) = tap_buffer_rx.next().await { tap_transformer.try_send(events); } - }.in_current_span()); + }); // Attempt to connect the sink. // diff --git a/src/lib.rs b/src/lib.rs index 2ff018957f2cc..fc05f14cc92eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -241,21 +241,7 @@ pub fn get_hostname() -> std::io::Result { }) } -/// Spawn a future on the current tokio runtime, propagating the current tracing span into the -/// spawned task. This ensures that any logs or internal metrics emitted by the task retain the -/// component tags (component_id, component_kind, component_type) of the caller. -/// -/// Prefer this over `tokio::spawn(future.in_current_span())` to keep call sites concise. -#[track_caller] -pub(crate) fn spawn_in_current_span( - task: impl std::future::Future + Send + 'static, -) -> tokio::task::JoinHandle -where - T: Send + 'static, -{ - use tracing::Instrument as _; - tokio::spawn(task.in_current_span()) -} +pub(crate) use vector_lib::spawn_in_current_span; /// Spawn a task with the given name. The name is only used if /// built with [`tokio_unstable`][tokio_unstable]. From 2b6f6b4179e8ce1bef2cdbaed6e1fc70ad84e7c7 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Mon, 1 Jun 2026 19:46:38 +0200 Subject: [PATCH 6/9] added rt feature to vector_common cargo file --- lib/vector-common/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/vector-common/Cargo.toml b/lib/vector-common/Cargo.toml index a583641c68cec..b329b981aa64c 100644 --- a/lib/vector-common/Cargo.toml +++ b/lib/vector-common/Cargo.toml @@ -50,7 +50,7 @@ serde_json.workspace = true smallvec = { version = "1", default-features = false } strum.workspace = true stream-cancel = { version = "0.8.2", default-features = false } -tokio = { workspace = true, features = ["macros", "time"] } +tokio = { workspace = true, features = ["macros", "rt", "time"] } tracing.workspace = true vrl.workspace = true vector-config.workspace = true From f45c022417d7acfc3860ef1da7653bc99e5b101e Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 2 Jun 2026 16:47:45 +0200 Subject: [PATCH 7/9] add a regression test --- Cargo.toml | 1 + tests/vector_api/top.rs | 69 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index c389001ed9276..35e1783159360 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1125,6 +1125,7 @@ vector-api-tests = [ "api", "api-client-minimal", "sources-demo_logs", + "sources-gcp_pubsub", "transforms-log_to_metric", "transforms-remap", "transforms-route", diff --git a/tests/vector_api/top.rs b/tests/vector_api/top.rs index 79e94a094785c..4f96e05d06348 100644 --- a/tests/vector_api/top.rs +++ b/tests/vector_api/top.rs @@ -410,3 +410,72 @@ async fn multi_output_transform_reports_per_output_sent_events() { "Never received non-empty output_totals for splitter transform within timeout" ); } + +/// Regression test: `component_errors_total` emitted from a spawned task must carry +/// the component's tracing span so that the metric is labelled with `component_id`. +/// +/// The gcp_pubsub source immediately tries to connect to a non-existent endpoint from +/// inside a `tokio::spawn`-ed per-stream task. Before the `spawn_in_current_span` fix +/// the task ran without the component span, so the counter had no `component_id` tag +/// and never appeared in the API's `ErrorsTotal` stream. With the fix the tag is +/// present and the stream delivers an entry for `component_id = "gcp"`. +#[tokio::test] +async fn gcp_pubsub_spawned_task_errors_carry_component_span() { + // Point the source at an unreachable local endpoint so the per-stream task + // immediately gets ECONNREFUSED, emitting GcpPubsubConnectError + // (component_errors_total) on every retry. + let mut runner = TestHarness::new(indoc! {" + sources: + gcp: + type: gcp_pubsub + project: test-project + subscription: test-subscription + endpoint: http://127.0.0.1:1 + skip_authentication: true + + sinks: + blackhole: + type: blackhole + inputs: ['gcp'] + "}) + .await + .expect("Failed to start Vector"); + + // Stream ErrorsTotal every 200 ms; look for a non-zero entry attributed to "gcp". + let mut stream = runner + .api_client() + .stream_component_metrics(MetricName::ErrorsTotal, 200) + .await + .expect("Failed to open errors_total stream"); + + let deadline = tokio::time::Instant::now() + EVENT_PROCESSING_TIMEOUT; + let mut found = false; + + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(std::time::Duration::from_millis(500), stream.next()).await { + Ok(Some(Ok(msg))) => { + if msg.component_id == "gcp" { + if let Some(Value::Total(total)) = msg.value { + assert!( + total.value > 0, + "Expected a positive component_errors_total for 'gcp', got {}", + total.value + ); + found = true; + break; + } + } + } + Ok(Some(Err(e))) => panic!("Stream error: {e}"), + Ok(None) => panic!("Stream ended unexpectedly"), + Err(_) => continue, // poll timeout, keep looping + } + } + + assert!( + found, + "component_errors_total was never attributed to component_id='gcp' within the timeout. \ + This means the spawned per-stream task ran without the component's tracing span, \ + so its metrics counters had no component_id tag and were invisible to the API." + ); +} From cb706f3a47c264390b8b76504aeaa2b80208ca24 Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 2 Jun 2026 17:56:51 +0200 Subject: [PATCH 8/9] apply limit to test shutdown duration --- tests/vector_api/harness.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/vector_api/harness.rs b/tests/vector_api/harness.rs index ba79b521bba22..e9fc94daa562f 100644 --- a/tests/vector_api/harness.rs +++ b/tests/vector_api/harness.rs @@ -97,7 +97,10 @@ impl TestHarness { let mut cmd = Command::cargo_bin("vector").map_err(|e| format!("Failed to get cargo bin: {e}"))?; - cmd.arg("-c").arg(&config_path); + cmd.arg("-c") + .arg(&config_path) + .arg("--graceful-shutdown-limit-secs") + .arg("1"); if watch_mode { cmd.arg("-w"); From e5ca812e4688c2f6e78155a4cc5c8de09d1013ce Mon Sep 17 00:00:00 2001 From: Yoenn Burban Date: Tue, 2 Jun 2026 18:16:10 +0200 Subject: [PATCH 9/9] fix clippy --- tests/vector_api/top.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/vector_api/top.rs b/tests/vector_api/top.rs index 4f96e05d06348..931ccf10162c8 100644 --- a/tests/vector_api/top.rs +++ b/tests/vector_api/top.rs @@ -454,16 +454,16 @@ async fn gcp_pubsub_spawned_task_errors_carry_component_span() { while tokio::time::Instant::now() < deadline { match tokio::time::timeout(std::time::Duration::from_millis(500), stream.next()).await { Ok(Some(Ok(msg))) => { - if msg.component_id == "gcp" { - if let Some(Value::Total(total)) = msg.value { - assert!( - total.value > 0, - "Expected a positive component_errors_total for 'gcp', got {}", - total.value - ); - found = true; - break; - } + if msg.component_id == "gcp" + && let Some(Value::Total(total)) = msg.value + { + assert!( + total.value > 0, + "Expected a positive component_errors_total for 'gcp', got {}", + total.value + ); + found = true; + break; } } Ok(Some(Err(e))) => panic!("Stream error: {e}"),