diff --git a/Cargo.toml b/Cargo.toml index c389001ed9276..35e1783159360 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1125,6 +1125,7 @@ vector-api-tests = [ "api", "api-client-minimal", "sources-demo_logs", + "sources-gcp_pubsub", "transforms-log_to_metric", "transforms-remap", "transforms-route", diff --git a/changelog.d/spawned_task_component_tags.fix.md b/changelog.d/spawned_task_component_tags.fix.md new file mode 100644 index 0000000000000..d2f5c2bb068a1 --- /dev/null +++ b/changelog.d/spawned_task_component_tags.fix.md @@ -0,0 +1,3 @@ +Internal telemetry (metrics and logs) emitted from work that Vector runs on spawned `tokio` tasks now correctly inherits the owning component's tags (`component_id`, `component_kind`, `component_type`). Previously, several components spawned background tasks without propagating the tracing span, so some internal events emitted from those tasks were missing their component tags. Affected emissions include the `datadog_logs` sink's `component_discarded_events_total` (events too large to encode), the `gcp_pubsub` source's `component_errors_total`/`component_discarded_events_total` from its per-stream tasks, and the `splunk_hec` sinks' acknowledgement-handling `component_errors_total`. + +authors: gwenaskell diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 87a436298ca16..17d80cf1586a4 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -148,7 +148,7 @@ where let mut stats = TimingStats::default(); // Spawn the checkpoint writer task - let checkpoint_task_handle = tokio::spawn(checkpoint_writer( + let checkpoint_task_handle = vector_common::spawn_in_current_span(checkpoint_writer( checkpointer, self.glob_minimum_cooldown, shutdown_checkpointer, diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index e63fc5fb16973..a2d1c68f8d8e5 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -700,7 +700,7 @@ where #[must_use] pub(super) fn spawn_finalizer(self: Arc) -> OrderedFinalizer { let (finalizer, mut stream) = OrderedFinalizer::new(None); - tokio::spawn(async move { + vector_common::spawn_in_current_span(async move { while let Some((_status, amount)) = stream.next().await { self.increment_pending_acks(amount); self.notify_writer_waiters(); diff --git a/lib/vector-common/Cargo.toml b/lib/vector-common/Cargo.toml index a583641c68cec..b329b981aa64c 100644 --- a/lib/vector-common/Cargo.toml +++ b/lib/vector-common/Cargo.toml @@ -50,7 +50,7 @@ serde_json.workspace = true smallvec = { version = "1", default-features = false } strum.workspace = true stream-cancel = { version = "0.8.2", default-features = false } -tokio = { workspace = true, features = ["macros", "time"] } +tokio = { workspace = true, features = ["macros", "rt", "time"] } tracing.workspace = true vrl.workspace = true vector-config.workspace = true diff --git a/lib/vector-common/src/lib.rs b/lib/vector-common/src/lib.rs index 75849aac75884..cda7e4b921242 100644 --- a/lib/vector-common/src/lib.rs +++ b/lib/vector-common/src/lib.rs @@ -130,3 +130,19 @@ pub type Error = Box; /// Vector's basic result type, defined in terms of [`Error`] and generic over /// `T`. pub type Result = std::result::Result; + +/// Spawn a future on the current tokio runtime, propagating the current tracing span into the +/// spawned task. This ensures that any logs or internal metrics emitted by the task retain the +/// component tags (`component_id`, `component_kind`, `component_type`) of the caller. +/// +/// Prefer this over `tokio::spawn(future.in_current_span())` to keep call sites concise. +#[track_caller] +pub fn spawn_in_current_span( + task: impl std::future::Future + Send + 'static, +) -> tokio::task::JoinHandle +where + T: Send + 'static, +{ + use tracing::Instrument as _; + tokio::spawn(task.in_current_span()) +} diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index 33ff30ae3e72a..500b6b8583c51 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -358,6 +358,10 @@ impl<'a> SendGroup<'a> { fn try_detach_send(&mut self, id: &ComponentKey) -> bool { if let Some(send) = self.sends.remove(id) { + // Deliberately not instrumented with the current span: this drains a send to a sink + // that has just been detached from the topology, so it is unrelated to the upstream + // component that owns this fanout. Attaching the current span would mis-tag this + // task's logs with the upstream component's identity rather than the detached sink's. tokio::spawn(async move { if let Err(e) = send.await { warn!( diff --git a/lib/vector-lib/src/lib.rs b/lib/vector-lib/src/lib.rs index ae62fd9053438..1247085a058a3 100644 --- a/lib/vector-lib/src/lib.rs +++ b/lib/vector-lib/src/lib.rs @@ -13,7 +13,8 @@ pub use vector_common::{ Error, NamedInternalEvent, Result, TimeZone, assert_event_data_eq, atomic, btreemap, byte_size_of, byte_size_of::ByteSizeOf, conversion, counter, encode_logfmt, finalization, finalizer, gauge, histogram, id, impl_event_data_eq, internal_event, json_size, - registered_event, request_metadata, sensitive_string, shutdown, stats, trigger, + registered_event, request_metadata, sensitive_string, shutdown, spawn_in_current_span, stats, + trigger, }; pub use vector_config as configurable; pub use vector_config::impl_generate_config_from_default; diff --git a/lib/vector-stream/src/concurrent_map.rs b/lib/vector-stream/src/concurrent_map.rs index 6b2920db1cc68..6dde7a31f7bbe 100644 --- a/lib/vector-stream/src/concurrent_map.rs +++ b/lib/vector-stream/src/concurrent_map.rs @@ -72,6 +72,11 @@ where Poll::Pending | Poll::Ready(None) => break, Poll::Ready(Some(item)) => { let fut = (this.f)(item); + // `ConcurrentMap` does not instrument the spawned future itself: the + // mapping closure runs on a detached task, so the current span at poll + // time is not necessarily meaningful for the work being performed. It is + // the caller's responsibility to propagate any span (e.g. the owning + // component's span for internal metric/log tagging) into `fut`. let handle = tokio::spawn(fut); this.in_flight.push_back(handle); } diff --git a/lib/vector-tap/src/controller.rs b/lib/vector-tap/src/controller.rs index 2a8208179f3e8..f0b19ecde5a72 100644 --- a/lib/vector-tap/src/controller.rs +++ b/lib/vector-tap/src/controller.rs @@ -214,7 +214,7 @@ impl TapController { fn shutdown_trigger(control_tx: fanout::ControlChannel, sink_id: ComponentKey) -> ShutdownTx { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - tokio::spawn(async move { + vector_common::spawn_in_current_span(async move { _ = shutdown_rx.await; if control_tx .send(fanout::ControlMessage::Remove(sink_id.clone())) @@ -366,7 +366,7 @@ async fn tap_handler( ); let mut tap_transformer = TapTransformer::new(tx.clone(), output.clone()); - tokio::spawn(async move { + vector_common::spawn_in_current_span(async move { while let Some(events) = tap_buffer_rx.next().await { tap_transformer.try_send(events); } diff --git a/src/api/grpc/service.rs b/src/api/grpc/service.rs index 110ef9d5ba441..e1e0808619db9 100644 --- a/src/api/grpc/service.rs +++ b/src/api/grpc/service.rs @@ -676,7 +676,7 @@ impl observability::Service for ObservabilityService { let watch_rx = self.watch_rx.clone(); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { let _tap_controller = TapController::new(watch_rx, tap_tx, patterns); let mut tap_rx = ReceiverStream::new(tap_rx); let mut interval = time::interval(time::Duration::from_millis(interval_ms)); diff --git a/src/api/grpc_server.rs b/src/api/grpc_server.rs index 715aeecad2b8e..f582b2f82981c 100644 --- a/src/api/grpc_server.rs +++ b/src/api/grpc_server.rs @@ -80,7 +80,7 @@ impl GrpcServer { let router_serving = Arc::clone(&serving); // Spawn the server with the already-bound listener - tokio::spawn(async move { + crate::spawn_in_current_span(async move { // Build reflection service for tools like grpcurl let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set( diff --git a/src/gcp.rs b/src/gcp.rs index 9a0ff365a69ee..89fec86770e0b 100644 --- a/src/gcp.rs +++ b/src/gcp.rs @@ -194,7 +194,7 @@ impl GcpAuthenticator { pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> { let (sender, receiver) = watch::channel(()); - tokio::spawn(self.clone().token_regenerator(sender)); + crate::spawn_in_current_span(self.clone().token_regenerator(sender)); receiver } diff --git a/src/lib.rs b/src/lib.rs index 8d26a3b080ecf..fc05f14cc92eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -241,6 +241,8 @@ pub fn get_hostname() -> std::io::Result { }) } +pub(crate) use vector_lib::spawn_in_current_span; + /// Spawn a task with the given name. The name is only used if /// built with [`tokio_unstable`][tokio_unstable]. /// diff --git a/src/secrets/exec.rs b/src/secrets/exec.rs index 081c752c2b36a..2a6e8c464065f 100644 --- a/src/secrets/exec.rs +++ b/src/secrets/exec.rs @@ -179,7 +179,7 @@ async fn query_backend( .ok_or("unable to acquire stdout")?; let query = serde_json::to_vec(&query)?; - tokio::spawn(async move { stdin.write_all(&query).await }); + crate::spawn_in_current_span(async move { stdin.write_all(&query).await }); let timeout = time::sleep(time::Duration::from_secs(timeout)); tokio::pin!(timeout); diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index ff0744913cc8b..8796c7425f691 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -57,7 +57,7 @@ impl StreamSink for BlackholeSink { if self.config.print_interval_secs.as_secs() > 0 { let interval_dur = self.config.print_interval_secs; - tokio::spawn(async move { + crate::spawn_in_current_span(async move { let mut print_interval = interval(interval_dur); loop { select! { diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 0abf4bed1ade5..5fb5e2189fa10 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -2,6 +2,7 @@ use std::{collections::VecDeque, fmt::Debug, io, sync::Arc}; use itertools::Itertools; use snafu::Snafu; +use tracing::Instrument; use vector_lib::{ event::{ObjectMap, Value}, internal_event::{ComponentEventsDropped, UNINTENTIONAL}, @@ -393,12 +394,19 @@ where .concurrent_map(default_request_builder_concurrency_limit(), move |input| { let builder = Arc::clone(&builder); - Box::pin(async move { - let (api_key, events) = input; - let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key)); + // `concurrent_map` spawns this future on a detached task. The closure itself runs + // within `run_inner`'s span, so `in_current_span` captures the sink span here and + // re-enters it on the spawned task to preserve the sink's automatic component tags. + Box::pin( + async move { + let (api_key, events) = input; + let api_key = + api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key)); - builder.build_request(events, api_key) - }) + builder.build_request(events, api_key) + } + .in_current_span(), + ) }) .filter_map(|request| async move { match request { diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 1cd075a831d9c..a366cc81150c8 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -8,6 +8,7 @@ use futures_util::{ stream::{self, BoxStream}, }; use tower::Service; +use tracing::Instrument; use vector_lib::{ event::{Event, Metric, MetricValue}, partition::Partitioner, @@ -136,11 +137,18 @@ where .concurrent_map( default_request_builder_concurrency_limit(), |((api_key, endpoint), metrics)| { - Box::pin(async move { - let collapsed_metrics = - sort_and_collapse_counters_by_series_and_timestamp(metrics); - ((api_key, endpoint), collapsed_metrics) - }) + // `concurrent_map` spawns this future on a detached task. The closure itself + // runs within `run_inner`'s span, so `in_current_span` captures the sink span + // here and re-enters it on the spawned task to preserve the sink's automatic + // component tags on any internal metrics/logs emitted during aggregation. + Box::pin( + async move { + let collapsed_metrics = + sort_and_collapse_counters_by_series_and_timestamp(metrics); + ((api_key, endpoint), collapsed_metrics) + } + .in_current_span(), + ) }, ) // We build our requests "incrementally", which means that for a single batch of metrics, we might generate diff --git a/src/sinks/datadog/traces/config.rs b/src/sinks/datadog/traces/config.rs index 7ace1b75f9817..00682514dadc1 100644 --- a/src/sinks/datadog/traces/config.rs +++ b/src/sinks/datadog/traces/config.rs @@ -178,7 +178,7 @@ impl DatadogTracesConfig { // Send the APM stats payloads independently of the sink framework. // This is necessary to comply with what the APM stats backend of Datadog expects with // respect to receiving stats payloads. - tokio::spawn(flush_apm_stats_thread( + crate::spawn_in_current_span(flush_apm_stats_thread( tripwire, client, compression, diff --git a/src/sinks/mqtt/sink.rs b/src/sinks/mqtt/sink.rs index cc69c8d6b9ff8..e9feb7bb0144e 100644 --- a/src/sinks/mqtt/sink.rs +++ b/src/sinks/mqtt/sink.rs @@ -60,7 +60,7 @@ impl MqttSink { let (client, mut connection) = self.connector.connect(); // This is necessary to keep the mqtt event loop moving forward. - tokio::spawn(async move { + crate::spawn_in_current_span(async move { loop { // If an error is returned here there is currently no way to tie this back // to the event that was posted which means we can't accurately provide diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 6072bc8274ea0..8ed514b638fa0 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -466,7 +466,7 @@ impl PrometheusExporter { let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?; let listener = tls.bind(&address).await?; - tokio::spawn(async move { + crate::spawn_in_current_span(async move { info!(message = "Building HTTP server.", address = %address); Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index 274d2b103cf86..b3719d01e514a 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -139,7 +139,7 @@ impl RedisConnection { Ok(Self::Sentinel { connection_send: conn_tx, connection_recv: conn_rx, - repair_task: Arc::new(tokio::spawn(async move { + repair_task: Arc::new(crate::spawn_in_current_span(async move { Self::repair_connection_manager_task( sentinel, service_name, diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index 095da254d832b..3b1c68c876561 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -58,7 +58,7 @@ where let max_pending_acks = indexer_acknowledgements.max_pending_acks.get(); let tx = if let Some(ack_client) = ack_client { let (tx, rx) = mpsc::channel(128); - tokio::spawn(run_acknowledgements( + crate::spawn_in_current_span(run_acknowledgements( rx, ack_client, Arc::clone(&http_request_builder), diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index f8dd31c67f129..e9c35c94f3e70 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -323,7 +323,7 @@ where this.lingers.remove(partition); let batch = batch.finish(); - let future = tokio::spawn(this.service.call(batch)); + let future = crate::spawn_in_current_span(this.service.call(batch)); if let Some(map) = this.in_flight.as_mut() { map.insert(partition.clone(), future.map(|_| ()).fuse().boxed()); diff --git a/src/sinks/websocket_server/sink.rs b/src/sinks/websocket_server/sink.rs index 77da730fbbdda..ea38d88fdac7f 100644 --- a/src/sinks/websocket_server/sink.rs +++ b/src/sinks/websocket_server/sink.rs @@ -37,7 +37,6 @@ use tokio_tungstenite::tungstenite::{ handshake::server::{ErrorResponse, Request, Response}, }; use tokio_util::codec::Encoder as _; -use tracing::Instrument; use url::Url; use uuid::Uuid; use vector_lib::{ @@ -129,20 +128,17 @@ impl WebSocketListenerSink { let open_gauge = OpenGauge::new(); while let Ok(stream) = listener.accept().await { - tokio::spawn( - Self::handle_connection( - auth.clone(), - message_buffering.clone(), - subprotocol.clone(), - Arc::clone(&peers), - Arc::clone(&client_checkpoints), - Arc::clone(&buffer), - stream, - extra_tags_config.clone(), - open_gauge.clone(), - ) - .in_current_span(), - ); + crate::spawn_in_current_span(Self::handle_connection( + auth.clone(), + message_buffering.clone(), + subprotocol.clone(), + Arc::clone(&peers), + Arc::clone(&client_checkpoints), + Arc::clone(&buffer), + stream, + extra_tags_config.clone(), + open_gauge.clone(), + )); } } @@ -360,19 +356,16 @@ impl StreamSink for WebSocketListenerSink { ))); let client_checkpoints = Arc::new(Mutex::new(HashMap::default())); - tokio::spawn( - Self::handle_connections( - self.auth, - self.message_buffering.clone(), - self.subprotocol.clone(), - Arc::clone(&peers), - self.extra_tags_config, - Arc::clone(&client_checkpoints), - Arc::clone(&message_buffer), - listener, - ) - .in_current_span(), - ); + crate::spawn_in_current_span(Self::handle_connections( + self.auth, + self.message_buffering.clone(), + self.subprotocol.clone(), + Arc::clone(&peers), + self.extra_tags_config, + Arc::clone(&client_checkpoints), + Arc::clone(&message_buffer), + listener, + )); while input.as_mut().peek().await.is_some() { let mut event = input.next().await.unwrap(); diff --git a/src/sources/aws_s3/sqs.rs b/src/sources/aws_s3/sqs.rs index 9ad5b47be61cc..802bdf5d38f0e 100644 --- a/src/sources/aws_s3/sqs.rs +++ b/src/sources/aws_s3/sqs.rs @@ -28,7 +28,6 @@ use smallvec::SmallVec; use snafu::{ResultExt, Snafu}; use tokio::{pin, select}; use tokio_util::codec::FramedRead; -use tracing::Instrument; use vector_lib::{ codecs::decoding::FramingError, config::{LegacyKey, LogNamespace, log_schema}, @@ -358,7 +357,7 @@ impl Ingestor { acknowledgements, ); let fut = process.run(); - let handle = tokio::spawn(fut.in_current_span()); + let handle = crate::spawn_in_current_span(fut); handles.push(handle); } diff --git a/src/sources/aws_sqs/source.rs b/src/sources/aws_sqs/source.rs index f8afdb9365ca9..7dc7c01e735e0 100644 --- a/src/sources/aws_sqs/source.rs +++ b/src/sources/aws_sqs/source.rs @@ -7,7 +7,6 @@ use aws_sdk_sqs::{ use chrono::{DateTime, TimeZone, Utc}; use futures::{FutureExt, StreamExt}; use tokio::{pin, select}; -use tracing_futures::Instrument; use vector_lib::{ config::LogNamespace, finalizer::UnorderedFinalizer, @@ -50,16 +49,13 @@ impl SqsSource { let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone())); let client = self.client.clone(); let queue_url = self.queue_url.clone(); - tokio::spawn( - async move { - while let Some((status, receipts)) = ack_stream.next().await { - if status == BatchStatus::Delivered { - delete_messages(client.clone(), receipts, queue_url.clone()).await; - } + crate::spawn_in_current_span(async move { + while let Some((status, receipts)) = ack_stream.next().await { + if status == BatchStatus::Delivered { + delete_messages(client.clone(), receipts, queue_url.clone()).await; } } - .in_current_span(), - ); + }); Arc::new(finalizer) }); let events_received = register!(EventsReceived); @@ -70,19 +66,16 @@ impl SqsSource { let mut out = out.clone(); let finalizer = finalizer.clone(); let events_received = events_received.clone(); - task_handles.push(tokio::spawn( - async move { - let finalizer = finalizer.as_ref(); - pin!(shutdown); - loop { - select! { - _ = &mut shutdown => break, - _ = source.run_once(&mut out, finalizer, events_received.clone()) => {}, - } + task_handles.push(crate::spawn_in_current_span(async move { + let finalizer = finalizer.as_ref(); + pin!(shutdown); + loop { + select! { + _ = &mut shutdown => break, + _ = source.run_once(&mut out, finalizer, events_received.clone()) => {}, } } - .in_current_span(), - )); + })); } // Wait for all of the processes to finish. If any one of them panics, we resume diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index 45268788d45cc..8c3ef9bac102f 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -22,7 +22,6 @@ use chrono::{DateTime, FixedOffset, Local, ParseError, Utc}; use futures::{Stream, StreamExt}; use serde_with::serde_as; use tokio::sync::mpsc; -use tracing_futures::Instrument; use vector_lib::{ codecs::{BytesDeserializer, BytesDeserializerConfig}, config::{LegacyKey, LogNamespace}, @@ -740,39 +739,36 @@ impl EventStreamBuilder { /// Spawn a task to runs event stream until shutdown. fn start(&self, id: ContainerId, backoff: Option) -> ContainerState { let this = self.clone(); - tokio::spawn( - async move { - if let Some(duration) = backoff { - tokio::time::sleep(duration).await; - } + crate::spawn_in_current_span(async move { + if let Some(duration) = backoff { + tokio::time::sleep(duration).await; + } - match this - .core - .docker - .inspect_container(id.as_str(), None::) - .await - { - Ok(details) => match ContainerMetadata::from_details(details) { - Ok(metadata) => { - let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp); - this.run_event_stream(info).await; - return; - } - Err(error) => emit!(DockerLogsTimestampParseError { - error, - container_id: id.as_str() - }), - }, - Err(error) => emit!(DockerLogsContainerMetadataFetchError { + match this + .core + .docker + .inspect_container(id.as_str(), None::) + .await + { + Ok(details) => match ContainerMetadata::from_details(details) { + Ok(metadata) => { + let info = ContainerLogInfo::new(id, metadata, this.core.now_timestamp); + this.run_event_stream(info).await; + return; + } + Err(error) => emit!(DockerLogsTimestampParseError { error, container_id: id.as_str() }), - } - - this.finish(Err((id, ErrorPersistence::Transient))); + }, + Err(error) => emit!(DockerLogsContainerMetadataFetchError { + error, + container_id: id.as_str() + }), } - .in_current_span(), - ); + + this.finish(Err((id, ErrorPersistence::Transient))); + }); ContainerState::new_running() } @@ -781,7 +777,7 @@ impl EventStreamBuilder { fn restart(&self, container: &mut ContainerState) { if let Some(info) = container.take_info() { let this = self.clone(); - tokio::spawn(this.run_event_stream(info).in_current_span()); + crate::spawn_in_current_span(this.run_event_stream(info)); } } diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index b87452ef107d1..48e3c14454d11 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -724,7 +724,7 @@ fn spawn_reader_thread( sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>, ) { // Start the green background thread for collecting - drop(tokio::spawn(async move { + drop(crate::spawn_in_current_span(async move { debug!("Start capturing {} command output.", origin); let mut stream = DecoderFramedRead::new(reader, decoder); diff --git a/src/sources/file.rs b/src/sources/file.rs index 93f4e188be97a..d485c6ad6b4b6 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -580,7 +580,7 @@ pub fn file_source( // checkpoints until all the acks have come in. let (send_shutdown, shutdown2) = oneshot::channel::<()>(); let checkpoints = checkpointer.view(); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { while let Some((status, entry)) = ack_stream.next().await { if status == BatchStatus::Delivered { checkpoints.update(entry.file_id, entry.offset); diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index b01664b7ade63..ddf96d19acf70 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -453,7 +453,7 @@ impl PubsubSource { // when it has an idle interval it will mark itself as not // busy. let busy_flag = Arc::new(AtomicBool::new(false)); - let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag))); + let task = crate::spawn_in_current_span(self.clone().run(Arc::clone(&busy_flag))); tasks.push(Task { task, busy_flag }); } diff --git a/src/sources/journald.rs b/src/sources/journald.rs index c6177fba1eb6c..177ee5e91663c 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -519,7 +519,7 @@ impl JournaldSource { let events_received = register!(EventsReceived); // Spawn stderr handler task - let stderr_handler = tokio::spawn(Self::handle_stderr(stderr_stream)); + let stderr_handler = crate::spawn_in_current_span(Self::handle_stderr(stderr_stream)); let batch_size = self.batch_size; let result = loop { @@ -1056,7 +1056,7 @@ impl Finalizer { ) -> Self { if acknowledgements { let (finalizer, mut ack_stream) = OrderedFinalizer::new(Some(shutdown)); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { while let Some((status, cursor)) = ack_stream.next().await { if status == BatchStatus::Delivered { checkpointer.lock().await.set(cursor).await; diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index cae76d8e02059..f5927b365157b 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -455,7 +455,7 @@ async fn kafka_source( .map_or(config.session_timeout_ms / 2, Duration::from_millis); let consumer_state = ConsumerStateInner::::new(config, decoder, out, log_namespace, span); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { coordinate_kafka_callbacks( consumer, callback_rx, diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index fa001972a59d7..04b26b4aa7c09 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -738,7 +738,7 @@ impl Source { let pod_state = pod_store_w.as_reader(); let pod_cacher = MetaCache::new(); - reflectors.push(tokio::spawn(custom_reflector( + reflectors.push(crate::spawn_in_current_span(custom_reflector( pod_store_w, pod_cacher, pod_watcher, @@ -762,7 +762,7 @@ impl Source { ) .backoff(watcher::DefaultBackoff::default()); - reflectors.push(tokio::spawn(custom_reflector( + reflectors.push(crate::spawn_in_current_span(custom_reflector( ns_store_w, MetaCache::new(), ns_watcher, @@ -787,7 +787,7 @@ impl Source { let node_state = node_store_w.as_reader(); let node_cacher = MetaCache::new(); - reflectors.push(tokio::spawn(custom_reflector( + reflectors.push(crate::spawn_in_current_span(custom_reflector( node_store_w, node_cacher, node_watcher, diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index 96ae1ab3a1263..2854c50496174 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -295,7 +295,7 @@ impl PostgresqlClient { endpoint: &self.endpoint, } })?; - tokio::spawn(connection); + crate::spawn_in_current_span(connection); client } None => { @@ -306,7 +306,7 @@ impl PostgresqlClient { .with_context(|_| ConnectionFailedSnafu { endpoint: &self.endpoint, })?; - tokio::spawn(connection); + crate::spawn_in_current_span(connection); client } }; diff --git a/src/sources/splunk_hec/acknowledgements.rs b/src/sources/splunk_hec/acknowledgements.rs index 3f65b7d115f95..c1688acb9945d 100644 --- a/src/sources/splunk_hec/acknowledgements.rs +++ b/src/sources/splunk_hec/acknowledgements.rs @@ -106,7 +106,7 @@ impl IndexerAcknowledgement { let idle_task_channels = Arc::clone(&channels); if config.ack_idle_cleanup { - tokio::spawn(async move { + crate::spawn_in_current_span(async move { let mut interval = interval(Duration::from_secs(max_idle_time)); loop { interval.tick().await; @@ -206,7 +206,7 @@ impl Channel { let ack_ids_status = Arc::new(Mutex::new(RoaringTreemap::new())); let finalizer_ack_ids_status = Arc::clone(&ack_ids_status); let (ack_event_finalizer, mut ack_stream) = UnorderedFinalizer::new(Some(shutdown)); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { while let Some((status, ack_id)) = ack_stream.next().await { if status == BatchStatus::Delivered { let mut ack_ids_status = finalizer_ack_ids_status.lock().unwrap(); diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index a42bae5815322..9c96b2f3fc4ef 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -908,7 +908,7 @@ async fn spawn_event_handling_tasks( ) -> JoinHandle<()> { wait_for_task_quota(&active_task_nums, max_frame_handling_tasks).await; - tokio::spawn(async move { + crate::spawn_in_current_span(async move { future::ready({ if let Some(evt) = event_handler.handle_event(received_from, event_data) && event_sink.send_event(evt).await.is_err() diff --git a/src/sources/windows_event_log/mod.rs b/src/sources/windows_event_log/mod.rs index 9fb2cff142ee6..945bc99b54fd9 100644 --- a/src/sources/windows_event_log/mod.rs +++ b/src/sources/windows_event_log/mod.rs @@ -101,7 +101,7 @@ impl Finalizer { OrderedFinalizer::::new(Some(shutdown.clone())); // Spawn background task to process acknowledgments and update checkpoints - tokio::spawn(async move { + crate::spawn_in_current_span(async move { while let Some((status, entry)) = ack_stream.next().await { if status == BatchStatus::Delivered { if let Err(e) = checkpointer.set_batch(entry.bookmarks.clone()).await { @@ -361,7 +361,7 @@ impl WindowsEventLogSource { } }; let shutdown_watcher = shutdown.clone(); - tokio::spawn(async move { + crate::spawn_in_current_span(async move { shutdown_watcher.await; unsafe { let handle = diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 6ed0206446597..9caa9e2f11378 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -1280,12 +1280,12 @@ impl Runner { let mut t = self.transform.clone(); let mut outputs_buf = self.outputs.new_buf_with_capacity(len); - let task = tokio::spawn(async move { + let task = crate::spawn_in_current_span(async move { for events in input_arrays { t.transform_all(events, &mut outputs_buf); } outputs_buf - }.in_current_span()); + }); in_flight.push_back(task); } None => { diff --git a/src/transforms/throttle/rate_limiter.rs b/src/transforms/throttle/rate_limiter.rs index 83f1b8de5384b..39a9cb63a4a63 100644 --- a/src/transforms/throttle/rate_limiter.rs +++ b/src/transforms/throttle/rate_limiter.rs @@ -25,7 +25,7 @@ where let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock)); let rate_limiter_clone = Arc::clone(&rate_limiter); - let flush_handle = tokio::spawn(async move { + let flush_handle = crate::spawn_in_current_span(async move { let mut interval = tokio::time::interval(flush_keys_interval); loop { interval.tick().await; diff --git a/tests/vector_api/harness.rs b/tests/vector_api/harness.rs index ba79b521bba22..e9fc94daa562f 100644 --- a/tests/vector_api/harness.rs +++ b/tests/vector_api/harness.rs @@ -97,7 +97,10 @@ impl TestHarness { let mut cmd = Command::cargo_bin("vector").map_err(|e| format!("Failed to get cargo bin: {e}"))?; - cmd.arg("-c").arg(&config_path); + cmd.arg("-c") + .arg(&config_path) + .arg("--graceful-shutdown-limit-secs") + .arg("1"); if watch_mode { cmd.arg("-w"); diff --git a/tests/vector_api/top.rs b/tests/vector_api/top.rs index 79e94a094785c..931ccf10162c8 100644 --- a/tests/vector_api/top.rs +++ b/tests/vector_api/top.rs @@ -410,3 +410,72 @@ async fn multi_output_transform_reports_per_output_sent_events() { "Never received non-empty output_totals for splitter transform within timeout" ); } + +/// Regression test: `component_errors_total` emitted from a spawned task must carry +/// the component's tracing span so that the metric is labelled with `component_id`. +/// +/// The gcp_pubsub source immediately tries to connect to a non-existent endpoint from +/// inside a `tokio::spawn`-ed per-stream task. Before the `spawn_in_current_span` fix +/// the task ran without the component span, so the counter had no `component_id` tag +/// and never appeared in the API's `ErrorsTotal` stream. With the fix the tag is +/// present and the stream delivers an entry for `component_id = "gcp"`. +#[tokio::test] +async fn gcp_pubsub_spawned_task_errors_carry_component_span() { + // Point the source at an unreachable local endpoint so the per-stream task + // immediately gets ECONNREFUSED, emitting GcpPubsubConnectError + // (component_errors_total) on every retry. + let mut runner = TestHarness::new(indoc! {" + sources: + gcp: + type: gcp_pubsub + project: test-project + subscription: test-subscription + endpoint: http://127.0.0.1:1 + skip_authentication: true + + sinks: + blackhole: + type: blackhole + inputs: ['gcp'] + "}) + .await + .expect("Failed to start Vector"); + + // Stream ErrorsTotal every 200 ms; look for a non-zero entry attributed to "gcp". + let mut stream = runner + .api_client() + .stream_component_metrics(MetricName::ErrorsTotal, 200) + .await + .expect("Failed to open errors_total stream"); + + let deadline = tokio::time::Instant::now() + EVENT_PROCESSING_TIMEOUT; + let mut found = false; + + while tokio::time::Instant::now() < deadline { + match tokio::time::timeout(std::time::Duration::from_millis(500), stream.next()).await { + Ok(Some(Ok(msg))) => { + if msg.component_id == "gcp" + && let Some(Value::Total(total)) = msg.value + { + assert!( + total.value > 0, + "Expected a positive component_errors_total for 'gcp', got {}", + total.value + ); + found = true; + break; + } + } + Ok(Some(Err(e))) => panic!("Stream error: {e}"), + Ok(None) => panic!("Stream ended unexpectedly"), + Err(_) => continue, // poll timeout, keep looping + } + } + + assert!( + found, + "component_errors_total was never attributed to component_id='gcp' within the timeout. \ + This means the spawned per-stream task ran without the component's tracing span, \ + so its metrics counters had no component_id tag and were invisible to the API." + ); +}