Skip to content

Commit 5a2b128

Browse files
parmesantotex-dev
andcommitted
Add OpenTelemetry instrumentation
Clean compilation with no warnings. Here's a summary of all changes made: ## Files Modified ### 1. `Cargo.toml` - Added dependencies: `tracing-opentelemetry = "0.32"`, `tracing-actix-web = "0.7"`, `opentelemetry`, `opentelemetry_sdk` (with `rt-tokio`), `opentelemetry-otlp` (with `grpc-tonic`, `http-proto`, `http-json`) — all from the same git rev as existing `opentelemetry-proto` - Added `tracing-subscriber` feature `"registry"` - Added `[patch.crates-io]` section to unify `opentelemetry` and `opentelemetry_sdk` types across all crates ### 2. `src/telemetry.rs` (NEW) - `init_otel_tracer() -> Option<SdkTracerProvider>` — reads `OTEL_EXPORTER_OTLP_ENDPOINT` env var; if unset returns `None` (OTel disabled). Supports gRPC, HTTP/protobuf, and HTTP/JSON (default) protocols via `OTEL_EXPORTER_OTLP_PROTOCOL`. Registers W3C `TraceContextPropagator` globally. ### 3. `src/lib.rs` - Added `pub mod telemetry;` ### 4. `src/main.rs` - `init_logger()` now returns `Option<SdkTracerProvider>` and wires the OTel tracing layer into the subscriber - `main()` captures the provider and calls `provider.shutdown()` before exit ### 5. `src/handlers/http/modal/mod.rs` - Replaced `actix_web::middleware::Logger::default()` with `tracing_actix_web::TracingLogger::default()` for automatic HTTP request tracing with W3C traceparent propagation ### 6. `src/handlers/http/query.rs` — 7 functions instrumented - **`query()`** — root span with `query.sql` and `query.streaming` fields - **`get_counts()`** — root span - **`handle_count_query()`** — child span with `table` field - **`handle_non_streaming_query()`** — child span - **`handle_streaming_query()`** — child span - **`into_query()`** — child span - **`get_records_and_fields()`** — child span - **`create_streams_for_distributed()`** — child span with `stream_count` field + Pattern 1 span propagation into `JoinSet::spawn` tasks ### 7. `src/query/mod.rs` — 4 functions instrumented - **`execute()`** — child span + **Pattern 2 W3C TraceContext propagation** across `QUERY_RUNTIME` (separate `Runtime::new()` — cross-OS-thread boundary). Injects context before spawn, extracts and sets parent inside the spawned closure. - **`Query::execute()`** — child span (`query.datafusion_execute`) - **`CountsRequest::get_bin_density()`** — child span with `stream` field - **`get_manifest_list()`** — child span with `stream` field ### 8. `src/storage/field_stats.rs` — 1 function instrumented - **`get_dataset_stats()`** — root span ### 9. `src/handlers/http/cluster/mod.rs` — 1 function instrumented - **`send_query_request()`** — child span Co-authored-by: otex-dev <dev@otex.dev>
1 parent 187a4e6 commit 5a2b128

9 files changed

Lines changed: 221 additions & 71 deletions

File tree

Cargo.toml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,14 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r
107107
prometheus = { version = "0.13.4", default-features = false, features = ["process"] }
108108
prometheus-parse = "0.2.5"
109109
tracing = "0.1"
110-
tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] }
110+
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "registry"] }
111+
tracing-opentelemetry = "0.32"
112+
tracing-actix-web = "0.7"
113+
114+
# OpenTelemetry tracing
115+
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
116+
opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["rt-tokio"] }
117+
opentelemetry-otlp = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["grpc-tonic", "http-proto", "http-json"] }
111118

112119
# Time and Date
113120
chrono = "0.4"
@@ -201,3 +208,7 @@ kafka = [
201208
inherits = "release"
202209
lto = "fat"
203210
codegen-units = 1
211+
212+
[patch.crates-io]
213+
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
214+
opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }

src/handlers/http/cluster/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,6 +1970,10 @@ pub async fn mark_querier_available(domain_name: &str) {
19701970
}
19711971
}
19721972

1973+
#[tracing::instrument(
1974+
name = "send_query_request",
1975+
skip(auth_token, query_request, tenant_id)
1976+
)]
19731977
pub async fn send_query_request(
19741978
auth_token: Option<HeaderMap>,
19751979
query_request: &Query,

src/handlers/http/modal/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ pub trait ParseableServer {
114114
.wrap(prometheus.clone())
115115
.configure(|config| Self::configure_routes(config))
116116
.wrap(from_fn(health_check::check_shutdown_middleware))
117-
.wrap(actix_web::middleware::Logger::default())
117+
.wrap(tracing_actix_web::TracingLogger::default())
118118
.wrap(actix_web::middleware::Compress::default())
119119
.wrap(cross_origin_config())
120120
};

src/handlers/http/query.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use std::pin::Pin;
4343
use std::sync::Arc;
4444
use std::time::Instant;
4545
use tokio::task::JoinSet;
46-
use tracing::{error, warn};
46+
use tracing::{Instrument, error, warn};
4747

4848
use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
4949
use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date};
@@ -79,6 +79,8 @@ pub struct Query {
7979
/// A function to execute the query and fetch QueryResponse
8080
/// This won't look in the cache
8181
/// TODO: Improve this function and make this a part of the query API
82+
#[tracing::instrument(name = "get_records_and_fields", skip(query_request, creds, tenant_id))]
83+
#[allow(clippy::type_complexity)]
8284
pub async fn get_records_and_fields(
8385
query_request: &Query,
8486
creds: &SessionKey,
@@ -115,6 +117,7 @@ pub async fn get_records_and_fields(
115117
Ok((Some(records), Some(fields)))
116118
}
117119

120+
#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming))]
118121
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
119122
let mut session_state = QUERY_SESSION.get_ctx().state();
120123
let time_range =
@@ -179,6 +182,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
179182
///
180183
/// # Returns
181184
/// - `HttpResponse` with the count result as JSON, including fields if requested.
185+
#[tracing::instrument(name = "handle_count_query", skip(query_request, time), fields(table = %table_name))]
182186
async fn handle_count_query(
183187
query_request: &Query,
184188
table_name: &str,
@@ -230,6 +234,10 @@ async fn handle_count_query(
230234
///
231235
/// # Returns
232236
/// - `HttpResponse` with the full query result as a JSON object.
237+
#[tracing::instrument(
238+
name = "handle_non_streaming_query",
239+
skip(query, query_request, time, table_name, tenant_id)
240+
)]
233241
async fn handle_non_streaming_query(
234242
query: LogicalQuery,
235243
table_name: Vec<String>,
@@ -283,6 +291,10 @@ async fn handle_non_streaming_query(
283291
///
284292
/// # Returns
285293
/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array.
294+
#[tracing::instrument(
295+
name = "handle_streaming_query",
296+
skip(query, query_request, time, table_name, tenant_id)
297+
)]
286298
async fn handle_streaming_query(
287299
query: LogicalQuery,
288300
table_name: Vec<String>,
@@ -367,6 +379,7 @@ fn create_batch_processor(
367379
}
368380
}
369381

382+
#[tracing::instrument(name = "get_counts", skip(req, counts_request), fields(otel.kind = "server"))]
370383
pub async fn get_counts(
371384
req: HttpRequest,
372385
counts_request: Json<CountsRequest>,
@@ -453,6 +466,7 @@ pub async fn update_schema_when_distributed(
453466
/// Create streams for querier if they do not exist
454467
/// get list of streams from memory and storage
455468
/// create streams for memory from storage if they do not exist
469+
#[tracing::instrument(name = "create_streams_for_distributed", skip_all, fields(stream_count = streams.len()))]
456470
pub async fn create_streams_for_distributed(
457471
streams: Vec<String>,
458472
tenant_id: &Option<String>,
@@ -461,19 +475,25 @@ pub async fn create_streams_for_distributed(
461475
return Ok(());
462476
}
463477
let mut join_set = JoinSet::new();
478+
let parent_span = tracing::Span::current();
464479
for stream_name in streams {
465480
let id = tenant_id.to_owned();
466-
join_set.spawn(async move {
467-
let result = PARSEABLE
468-
.create_stream_and_schema_from_storage(&stream_name, &id)
469-
.await;
470-
471-
if let Err(e) = &result {
472-
warn!("Failed to create stream '{}': {}", stream_name, e);
481+
let task_span =
482+
tracing::info_span!(parent: &parent_span, "create_stream_task", stream = %stream_name);
483+
join_set.spawn(
484+
async move {
485+
let result = PARSEABLE
486+
.create_stream_and_schema_from_storage(&stream_name, &id)
487+
.await;
488+
489+
if let Err(e) = &result {
490+
warn!("Failed to create stream '{}': {}", stream_name, e);
491+
}
492+
493+
(stream_name, result)
473494
}
474-
475-
(stream_name, result)
476-
});
495+
.instrument(task_span),
496+
);
477497
}
478498

479499
while let Some(result) = join_set.join_next().await {
@@ -516,6 +536,7 @@ impl FromRequest for Query {
516536
}
517537
}
518538

539+
#[tracing::instrument(name = "into_query", skip(query, session_state, time_range))]
519540
pub async fn into_query(
520541
query: &Query,
521542
session_state: &SessionState,

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ mod static_schema;
4848
mod stats;
4949
pub mod storage;
5050
pub mod sync;
51+
pub mod telemetry;
5152
pub mod tenants;
5253
pub mod users;
5354
pub mod utils;

src/main.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ use std::process::exit;
1717
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1818
*
1919
*/
20+
use opentelemetry::trace::TracerProvider as _;
21+
use opentelemetry_sdk::trace::SdkTracerProvider;
2022
#[cfg(feature = "kafka")]
2123
use parseable::connectors;
2224
use parseable::{
2325
IngestServer, ParseableServer, QueryServer, Server, banner, metrics, option::Mode,
24-
parseable::PARSEABLE, rbac, storage,
26+
parseable::PARSEABLE, rbac, storage, telemetry,
2527
};
2628
use tokio::signal::ctrl_c;
2729
use tokio::sync::oneshot;
@@ -33,7 +35,7 @@ use tracing_subscriber::{EnvFilter, Registry, fmt};
3335

3436
#[actix_web::main]
3537
async fn main() -> anyhow::Result<()> {
36-
init_logger();
38+
let tracer_provider = init_logger();
3739
// Install the rustls crypto provider before any TLS operations.
3840
// This is required for rustls 0.23+ which needs an explicit crypto provider.
3941
// If the installation fails, log a warning but continue execution.
@@ -95,10 +97,14 @@ async fn main() -> anyhow::Result<()> {
9597
parseable_server.await?;
9698
}
9799

100+
if let Some(provider) = tracer_provider {
101+
let _ = provider.shutdown();
102+
}
103+
98104
Ok(())
99105
}
100106

101-
pub fn init_logger() {
107+
pub fn init_logger() -> Option<SdkTracerProvider> {
102108
let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
103109
let default_level = if cfg!(debug_assertions) {
104110
Level::DEBUG
@@ -116,10 +122,20 @@ pub fn init_logger() {
116122
.with_target(true)
117123
.compact();
118124

125+
let otel_provider = telemetry::init_otel_tracer();
126+
127+
let otel_layer = otel_provider.as_ref().map(|provider| {
128+
let tracer = provider.tracer("parseable");
129+
tracing_opentelemetry::layer().with_tracer(tracer)
130+
});
131+
119132
Registry::default()
120133
.with(filter_layer)
121134
.with(fmt_layer)
135+
.with(otel_layer)
122136
.init();
137+
138+
otel_provider
123139
}
124140

125141
#[cfg(windows)]

src/query/mod.rs

Lines changed: 48 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ use std::sync::{Arc, RwLock};
5555
use std::task::{Context, Poll};
5656
use sysinfo::System;
5757
use tokio::runtime::Runtime;
58+
use tracing::Instrument;
59+
use tracing_opentelemetry::OpenTelemetrySpanExt;
5860

5961
use self::error::ExecuteError;
6062
use self::stream_schema_provider::GlobalSchemaProvider;
@@ -76,6 +78,25 @@ use crate::utils::time::TimeRange;
7678
// pub static QUERY_SESSION: Lazy<SessionContext> =
7779
// Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));
7880

81+
pub type RB = Either<
82+
Vec<RecordBatch>,
83+
Pin<
84+
Box<
85+
RecordBatchStreamAdapter<
86+
select_all::SelectAll<
87+
Pin<
88+
Box<
89+
dyn RecordBatchStream<
90+
Item = Result<RecordBatch, datafusion::error::DataFusionError>,
91+
> + Send,
92+
>,
93+
>,
94+
>,
95+
>,
96+
>,
97+
>,
98+
>;
99+
79100
pub static QUERY_SESSION_STATE: Lazy<SessionState> =
80101
Lazy::new(|| Query::create_session_state(PARSEABLE.storage()));
81102

@@ -133,40 +154,35 @@ impl InMemorySessionContext {
133154

134155
/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
135156
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
157+
#[tracing::instrument(name = "query.execute", skip_all, fields(streaming = is_streaming))]
136158
pub async fn execute(
137159
query: Query,
138160
is_streaming: bool,
139161
tenant_id: &Option<String>,
140-
) -> Result<
141-
(
142-
Either<
143-
Vec<RecordBatch>,
144-
Pin<
145-
Box<
146-
RecordBatchStreamAdapter<
147-
select_all::SelectAll<
148-
Pin<
149-
Box<
150-
dyn RecordBatchStream<
151-
Item = Result<
152-
RecordBatch,
153-
datafusion::error::DataFusionError,
154-
>,
155-
> + Send,
156-
>,
157-
>,
158-
>,
159-
>,
160-
>,
161-
>,
162-
>,
163-
Vec<String>,
164-
),
165-
ExecuteError,
166-
> {
162+
) -> Result<(RB, Vec<String>), ExecuteError> {
167163
let id = tenant_id.clone();
164+
165+
// W3C TraceContext propagation across QUERY_RUNTIME (separate OS-thread runtime).
166+
// tracing::Span alone does NOT carry OTel context across OS threads.
167+
let mut trace_ctx = std::collections::HashMap::new();
168+
let cx = tracing::Span::current().context();
169+
opentelemetry::global::get_text_map_propagator(|propagator| {
170+
propagator.inject_context(&cx, &mut trace_ctx);
171+
});
172+
168173
QUERY_RUNTIME
169-
.spawn(async move { query.execute(is_streaming, &id).await })
174+
.spawn(async move {
175+
// Extract the propagated context on the QUERY_RUNTIME thread
176+
let parent_cx = opentelemetry::global::get_text_map_propagator(|propagator| {
177+
propagator.extract(&trace_ctx)
178+
});
179+
let span = tracing::info_span!("query.runtime_execute", streaming = is_streaming);
180+
let _ = span.set_parent(parent_cx);
181+
182+
async move { query.execute(is_streaming, &id).await }
183+
.instrument(span)
184+
.await
185+
})
170186
.await
171187
.expect("The Join should have been successful")
172188
}
@@ -272,37 +288,12 @@ impl Query {
272288
/// this function returns the result of the query
273289
/// if streaming is true, it returns a stream
274290
/// if streaming is false, it returns a vector of record batches
291+
#[tracing::instrument(name = "query.datafusion_execute", skip_all, fields(streaming = is_streaming))]
275292
pub async fn execute(
276293
&self,
277294
is_streaming: bool,
278295
tenant_id: &Option<String>,
279-
) -> Result<
280-
(
281-
Either<
282-
Vec<RecordBatch>,
283-
Pin<
284-
Box<
285-
RecordBatchStreamAdapter<
286-
select_all::SelectAll<
287-
Pin<
288-
Box<
289-
dyn RecordBatchStream<
290-
Item = Result<
291-
RecordBatch,
292-
datafusion::error::DataFusionError,
293-
>,
294-
> + Send,
295-
>,
296-
>,
297-
>,
298-
>,
299-
>,
300-
>,
301-
>,
302-
Vec<String>,
303-
),
304-
ExecuteError,
305-
> {
296+
) -> Result<(RB, Vec<String>), ExecuteError> {
306297
let df = QUERY_SESSION
307298
.get_ctx()
308299
.execute_logical_plan(self.final_logical_plan(tenant_id))
@@ -526,6 +517,7 @@ impl CountsRequest {
526517
/// This function is supposed to read maninfest files for the given stream,
527518
/// get the sum of `num_rows` between the `startTime` and `endTime`,
528519
/// divide that by number of bins and return in a manner acceptable for the console
520+
#[tracing::instrument(name = "get_bin_density", skip_all, fields(stream = %self.stream))]
529521
pub async fn get_bin_density(
530522
&self,
531523
tenant_id: &Option<String>,
@@ -731,6 +723,7 @@ pub fn resolve_stream_names(sql: &str) -> Result<Vec<String>, anyhow::Error> {
731723
Ok(tables)
732724
}
733725

726+
#[tracing::instrument(name = "get_manifest_list", skip(time_range, tenant_id), fields(stream = %stream_name))]
734727
pub async fn get_manifest_list(
735728
stream_name: &str,
736729
time_range: &TimeRange,

src/storage/field_stats.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ pub struct QueryRow {
525525
/// API handler to get the field stats for a dataset
526526
/// If `fields` is empty, stats for all fields will be returned
527527
/// If `fields` is provided, stats for those fields will be returned
528+
#[tracing::instrument(name = "get_dataset_stats", skip(req, dataset_stats_request), fields(otel.kind = "server"))]
528529
pub async fn get_dataset_stats(
529530
req: HttpRequest,
530531
dataset_stats_request: Json<DataSetStatsRequest>,

0 commit comments

Comments
 (0)