From 5a2b128b0ce62da67e1d426e80b11338242c5cb5 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 1 Apr 2026 20:53:21 +0530 Subject: [PATCH] Add OpenTelemetry instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clean compilation with no warnings. Here's a summary of all changes made: ## Files Modified ### 1. `Cargo.toml` - Added dependencies: `tracing-opentelemetry = "0.32"`, `tracing-actix-web = "0.7"`, `opentelemetry`, `opentelemetry_sdk` (with `rt-tokio`), `opentelemetry-otlp` (with `grpc-tonic`, `http-proto`, `http-json`) — all from the same git rev as existing `opentelemetry-proto` - Added `tracing-subscriber` feature `"registry"` - Added `[patch.crates-io]` section to unify `opentelemetry` and `opentelemetry_sdk` types across all crates ### 2. `src/telemetry.rs` (NEW) - `init_otel_tracer() -> Option` — reads `OTEL_EXPORTER_OTLP_ENDPOINT` env var; if unset returns `None` (OTel disabled). Supports gRPC, HTTP/protobuf, and HTTP/JSON (default) protocols via `OTEL_EXPORTER_OTLP_PROTOCOL`. Registers W3C `TraceContextPropagator` globally. ### 3. `src/lib.rs` - Added `pub mod telemetry;` ### 4. `src/main.rs` - `init_logger()` now returns `Option` and wires the OTel tracing layer into the subscriber - `main()` captures the provider and calls `provider.shutdown()` before exit ### 5. `src/handlers/http/modal/mod.rs` - Replaced `actix_web::middleware::Logger::default()` with `tracing_actix_web::TracingLogger::default()` for automatic HTTP request tracing with W3C traceparent propagation ### 6. `src/handlers/http/query.rs` — 7 functions instrumented - **`query()`** — root span with `query.sql` and `query.streaming` fields - **`get_counts()`** — root span - **`handle_count_query()`** — child span with `table` field - **`handle_non_streaming_query()`** — child span - **`handle_streaming_query()`** — child span - **`into_query()`** — child span - **`get_records_and_fields()`** — child span - **`create_streams_for_distributed()`** — child span with `stream_count` field + Pattern 1 span propagation into `JoinSet::spawn` tasks ### 7. `src/query/mod.rs` — 4 functions instrumented - **`execute()`** — child span + **Pattern 2 W3C TraceContext propagation** across `QUERY_RUNTIME` (separate `Runtime::new()` — cross-OS-thread boundary). Injects context before spawn, extracts and sets parent inside the spawned closure. - **`Query::execute()`** — child span (`query.datafusion_execute`) - **`CountsRequest::get_bin_density()`** — child span with `stream` field - **`get_manifest_list()`** — child span with `stream` field ### 8. `src/storage/field_stats.rs` — 1 function instrumented - **`get_dataset_stats()`** — root span ### 9. `src/handlers/http/cluster/mod.rs` — 1 function instrumented - **`send_query_request()`** — child span Co-authored-by: otex-dev --- Cargo.toml | 13 +++- src/handlers/http/cluster/mod.rs | 4 ++ src/handlers/http/modal/mod.rs | 2 +- src/handlers/http/query.rs | 43 +++++++++---- src/lib.rs | 1 + src/main.rs | 22 ++++++- src/query/mod.rs | 103 ++++++++++++++----------------- src/storage/field_stats.rs | 1 + src/telemetry.rs | 103 +++++++++++++++++++++++++++++++ 9 files changed, 221 insertions(+), 71 deletions(-) create mode 100644 src/telemetry.rs diff --git a/Cargo.toml b/Cargo.toml index 2d35ac750..8204d19d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,7 +107,14 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r prometheus = { version = "0.13.4", default-features = false, features = ["process"] } prometheus-parse = "0.2.5" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "registry"] } +tracing-opentelemetry = "0.32" +tracing-actix-web = "0.7" + +# OpenTelemetry tracing +opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" } +opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["rt-tokio"] } +opentelemetry-otlp = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["grpc-tonic", "http-proto", "http-json"] } # Time and Date chrono = "0.4" @@ -201,3 +208,7 @@ kafka = [ inherits = "release" lto = "fat" codegen-units = 1 + +[patch.crates-io] +opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" } +opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" } diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 247f86325..49a97d5cd 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -1970,6 +1970,10 @@ pub async fn mark_querier_available(domain_name: &str) { } } +#[tracing::instrument( + name = "send_query_request", + skip(auth_token, query_request, tenant_id) +)] pub async fn send_query_request( auth_token: Option, query_request: &Query, diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 115f8b643..2ed0fc411 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -114,7 +114,7 @@ pub trait ParseableServer { .wrap(prometheus.clone()) .configure(|config| Self::configure_routes(config)) .wrap(from_fn(health_check::check_shutdown_middleware)) - .wrap(actix_web::middleware::Logger::default()) + .wrap(tracing_actix_web::TracingLogger::default()) .wrap(actix_web::middleware::Compress::default()) .wrap(cross_origin_config()) }; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index fd7804f7f..3e5e22d8a 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -43,7 +43,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Instant; use tokio::task::JoinSet; -use tracing::{error, warn}; +use tracing::{Instrument, error, warn}; use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema}; use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date}; @@ -79,6 +79,8 @@ pub struct Query { /// A function to execute the query and fetch QueryResponse /// This won't look in the cache /// TODO: Improve this function and make this a part of the query API +#[tracing::instrument(name = "get_records_and_fields", skip(query_request, creds, tenant_id))] +#[allow(clippy::type_complexity)] pub async fn get_records_and_fields( query_request: &Query, creds: &SessionKey, @@ -115,6 +117,7 @@ pub async fn get_records_and_fields( Ok((Some(records), Some(fields))) } +#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming))] pub async fn query(req: HttpRequest, query_request: Query) -> Result { let mut session_state = QUERY_SESSION.get_ctx().state(); let time_range = @@ -179,6 +182,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result, @@ -283,6 +291,10 @@ async fn handle_non_streaming_query( /// /// # Returns /// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array. +#[tracing::instrument( + name = "handle_streaming_query", + skip(query, query_request, time, table_name, tenant_id) +)] async fn handle_streaming_query( query: LogicalQuery, table_name: Vec, @@ -367,6 +379,7 @@ fn create_batch_processor( } } +#[tracing::instrument(name = "get_counts", skip(req, counts_request), fields(otel.kind = "server"))] pub async fn get_counts( req: HttpRequest, counts_request: Json, @@ -453,6 +466,7 @@ pub async fn update_schema_when_distributed( /// Create streams for querier if they do not exist /// get list of streams from memory and storage /// create streams for memory from storage if they do not exist +#[tracing::instrument(name = "create_streams_for_distributed", skip_all, fields(stream_count = streams.len()))] pub async fn create_streams_for_distributed( streams: Vec, tenant_id: &Option, @@ -461,19 +475,25 @@ pub async fn create_streams_for_distributed( return Ok(()); } let mut join_set = JoinSet::new(); + let parent_span = tracing::Span::current(); for stream_name in streams { let id = tenant_id.to_owned(); - join_set.spawn(async move { - let result = PARSEABLE - .create_stream_and_schema_from_storage(&stream_name, &id) - .await; - - if let Err(e) = &result { - warn!("Failed to create stream '{}': {}", stream_name, e); + let task_span = + tracing::info_span!(parent: &parent_span, "create_stream_task", stream = %stream_name); + join_set.spawn( + async move { + let result = PARSEABLE + .create_stream_and_schema_from_storage(&stream_name, &id) + .await; + + if let Err(e) = &result { + warn!("Failed to create stream '{}': {}", stream_name, e); + } + + (stream_name, result) } - - (stream_name, result) - }); + .instrument(task_span), + ); } while let Some(result) = join_set.join_next().await { @@ -516,6 +536,7 @@ impl FromRequest for Query { } } +#[tracing::instrument(name = "into_query", skip(query, session_state, time_range))] pub async fn into_query( query: &Query, session_state: &SessionState, diff --git a/src/lib.rs b/src/lib.rs index 75b4254be..dbd7faf32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ mod static_schema; mod stats; pub mod storage; pub mod sync; +pub mod telemetry; pub mod tenants; pub mod users; pub mod utils; diff --git a/src/main.rs b/src/main.rs index 42cba34f5..1db05a515 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,11 +17,13 @@ use std::process::exit; * along with this program. If not, see . * */ +use opentelemetry::trace::TracerProvider as _; +use opentelemetry_sdk::trace::SdkTracerProvider; #[cfg(feature = "kafka")] use parseable::connectors; use parseable::{ IngestServer, ParseableServer, QueryServer, Server, banner, metrics, option::Mode, - parseable::PARSEABLE, rbac, storage, + parseable::PARSEABLE, rbac, storage, telemetry, }; use tokio::signal::ctrl_c; use tokio::sync::oneshot; @@ -33,7 +35,7 @@ use tracing_subscriber::{EnvFilter, Registry, fmt}; #[actix_web::main] async fn main() -> anyhow::Result<()> { - init_logger(); + let tracer_provider = init_logger(); // Install the rustls crypto provider before any TLS operations. // This is required for rustls 0.23+ which needs an explicit crypto provider. // If the installation fails, log a warning but continue execution. @@ -95,10 +97,14 @@ async fn main() -> anyhow::Result<()> { parseable_server.await?; } + if let Some(provider) = tracer_provider { + let _ = provider.shutdown(); + } + Ok(()) } -pub fn init_logger() { +pub fn init_logger() -> Option { let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| { let default_level = if cfg!(debug_assertions) { Level::DEBUG @@ -116,10 +122,20 @@ pub fn init_logger() { .with_target(true) .compact(); + let otel_provider = telemetry::init_otel_tracer(); + + let otel_layer = otel_provider.as_ref().map(|provider| { + let tracer = provider.tracer("parseable"); + tracing_opentelemetry::layer().with_tracer(tracer) + }); + Registry::default() .with(filter_layer) .with(fmt_layer) + .with(otel_layer) .init(); + + otel_provider } #[cfg(windows)] diff --git a/src/query/mod.rs b/src/query/mod.rs index e88ef802f..94d2c1a5c 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -55,6 +55,8 @@ use std::sync::{Arc, RwLock}; use std::task::{Context, Poll}; use sysinfo::System; use tokio::runtime::Runtime; +use tracing::Instrument; +use tracing_opentelemetry::OpenTelemetrySpanExt; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; @@ -76,6 +78,25 @@ use crate::utils::time::TimeRange; // pub static QUERY_SESSION: Lazy = // Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +pub type RB = Either< + Vec, + Pin< + Box< + RecordBatchStreamAdapter< + select_all::SelectAll< + Pin< + Box< + dyn RecordBatchStream< + Item = Result, + > + Send, + >, + >, + >, + >, + >, + >, +>; + pub static QUERY_SESSION_STATE: Lazy = Lazy::new(|| Query::create_session_state(PARSEABLE.storage())); @@ -133,40 +154,35 @@ impl InMemorySessionContext { /// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU /// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results. +#[tracing::instrument(name = "query.execute", skip_all, fields(streaming = is_streaming))] pub async fn execute( query: Query, is_streaming: bool, tenant_id: &Option, -) -> Result< - ( - Either< - Vec, - Pin< - Box< - RecordBatchStreamAdapter< - select_all::SelectAll< - Pin< - Box< - dyn RecordBatchStream< - Item = Result< - RecordBatch, - datafusion::error::DataFusionError, - >, - > + Send, - >, - >, - >, - >, - >, - >, - >, - Vec, - ), - ExecuteError, -> { +) -> Result<(RB, Vec), ExecuteError> { let id = tenant_id.clone(); + + // W3C TraceContext propagation across QUERY_RUNTIME (separate OS-thread runtime). + // tracing::Span alone does NOT carry OTel context across OS threads. + let mut trace_ctx = std::collections::HashMap::new(); + let cx = tracing::Span::current().context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&cx, &mut trace_ctx); + }); + QUERY_RUNTIME - .spawn(async move { query.execute(is_streaming, &id).await }) + .spawn(async move { + // Extract the propagated context on the QUERY_RUNTIME thread + let parent_cx = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&trace_ctx) + }); + let span = tracing::info_span!("query.runtime_execute", streaming = is_streaming); + let _ = span.set_parent(parent_cx); + + async move { query.execute(is_streaming, &id).await } + .instrument(span) + .await + }) .await .expect("The Join should have been successful") } @@ -272,37 +288,12 @@ impl Query { /// this function returns the result of the query /// if streaming is true, it returns a stream /// if streaming is false, it returns a vector of record batches + #[tracing::instrument(name = "query.datafusion_execute", skip_all, fields(streaming = is_streaming))] pub async fn execute( &self, is_streaming: bool, tenant_id: &Option, - ) -> Result< - ( - Either< - Vec, - Pin< - Box< - RecordBatchStreamAdapter< - select_all::SelectAll< - Pin< - Box< - dyn RecordBatchStream< - Item = Result< - RecordBatch, - datafusion::error::DataFusionError, - >, - > + Send, - >, - >, - >, - >, - >, - >, - >, - Vec, - ), - ExecuteError, - > { + ) -> Result<(RB, Vec), ExecuteError> { let df = QUERY_SESSION .get_ctx() .execute_logical_plan(self.final_logical_plan(tenant_id)) @@ -526,6 +517,7 @@ impl CountsRequest { /// This function is supposed to read maninfest files for the given stream, /// get the sum of `num_rows` between the `startTime` and `endTime`, /// divide that by number of bins and return in a manner acceptable for the console + #[tracing::instrument(name = "get_bin_density", skip_all, fields(stream = %self.stream))] pub async fn get_bin_density( &self, tenant_id: &Option, @@ -731,6 +723,7 @@ pub fn resolve_stream_names(sql: &str) -> Result, anyhow::Error> { Ok(tables) } +#[tracing::instrument(name = "get_manifest_list", skip(time_range, tenant_id), fields(stream = %stream_name))] pub async fn get_manifest_list( stream_name: &str, time_range: &TimeRange, diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index ccc606b5e..7ddf40df3 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -525,6 +525,7 @@ pub struct QueryRow { /// API handler to get the field stats for a dataset /// If `fields` is empty, stats for all fields will be returned /// If `fields` is provided, stats for those fields will be returned +#[tracing::instrument(name = "get_dataset_stats", skip(req, dataset_stats_request), fields(otel.kind = "server"))] pub async fn get_dataset_stats( req: HttpRequest, dataset_stats_request: Json, diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 000000000..4472da058 --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,103 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::SpanExporter; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + Resource, + propagation::TraceContextPropagator, + trace::{BatchSpanProcessor, SdkTracerProvider}, +}; + +/// Initialise an OTLP tracer provider. +/// +/// **Required env var:** +/// - `OTEL_EXPORTER_OTLP_ENDPOINT` — collector address. +/// For HTTP exporters the SDK appends the signal path automatically: +/// e.g. `http://localhost:4318` → `http://localhost:4318/v1/traces`. +/// Set a signal-specific var `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` to +/// supply a full URL without any path suffix being added. +/// +/// **Optional env vars (all read by the SDK automatically):** +/// - `OTEL_EXPORTER_OTLP_PROTOCOL` — transport + serialisation (default: `http/json`): +/// - `grpc` → gRPC / tonic (Jaeger, Tempo, …) +/// - `http/json` → HTTP + JSON (Parseable OSS ingest at `/v1/traces`) +/// - `http/protobuf` → HTTP + protobuf +/// - `OTEL_EXPORTER_OTLP_HEADERS` — comma-separated `key=value` pairs forwarded +/// as gRPC metadata or HTTP headers, e.g. +/// `authorization=Basic ,x-p-stream=my-stream,x-p-log-source=otel-traces` +/// +/// Returns `None` when `OTEL_EXPORTER_OTLP_ENDPOINT` is not set (OTEL disabled). +/// The caller must call `provider.shutdown()` before process exit. +pub fn init_otel_tracer() -> Option { + // Only used to decide whether OTEL is enabled; the SDK reads it again + // from env to build the exporter (which also appends /v1/traces for HTTP). + std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok()?; + + let protocol = + std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "http/json".to_string()); + + // Build the exporter using the SDK's env-var-aware builders. + // We intentionally do NOT call .with_endpoint() / .with_headers() / + // .with_metadata() here — the SDK reads OTEL_EXPORTER_OTLP_ENDPOINT and + // OTEL_EXPORTER_OTLP_HEADERS from the environment automatically, which + // preserves correct path-appending behaviour for HTTP exporters. + let exporter = match protocol.as_str() { + // ── gRPC ───────────────────────────────────────────────────────────── + "grpc" => SpanExporter::builder().with_tonic().build(), + // ── HTTP/Protobuf ──────────────────────────────────────────────────── + "http/protobuf" => SpanExporter::builder() + .with_http() + .with_protocol(Protocol::HttpBinary) + .build(), + // ── HTTP/JSON (default) ────────────────────────────────────────────── + // Default when OTEL_EXPORTER_OTLP_PROTOCOL is unset. + // Required for Parseable OSS — it only accepts application/json. + _ => SpanExporter::builder() + .with_http() + .with_protocol(Protocol::HttpJson) + .build(), + }; + + let exporter = exporter + .map_err(|e| tracing::warn!("Failed to build OTEL span exporter: {}", e)) + .ok()?; + + let resource = Resource::builder_empty() + .with_service_name("parseable") + .build(); + + let processor = BatchSpanProcessor::builder(exporter).build(); + + let provider = SdkTracerProvider::builder() + .with_span_processor(processor) + .with_resource(resource) + .build(); + + opentelemetry::global::set_tracer_provider(provider.clone()); + + // Register the W3C TraceContext propagator globally. + // This is REQUIRED for: + // - Incoming HTTP header extraction (traceparent/tracestate) + // - Cross-thread channel propagation via inject/extract + // Without this, propagator.extract() returns an empty context. + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + + Some(provider) +}