Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,14 @@ opentelemetry-proto = { git = "https://github.com/open-telemetry/opentelemetry-r
prometheus = { version = "0.13.4", default-features = false, features = ["process"] }
prometheus-parse = "0.2.5"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "registry"] }
tracing-opentelemetry = "0.32"
tracing-actix-web = "0.7"

# OpenTelemetry tracing
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["rt-tokio"] }
opentelemetry-otlp = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930", features = ["grpc-tonic", "http-proto", "http-json"] }

# Time and Date
chrono = "0.4"
Expand Down Expand Up @@ -201,3 +208,7 @@ kafka = [
inherits = "release"
lto = "fat"
codegen-units = 1

[patch.crates-io]
opentelemetry = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
opentelemetry_sdk = { git = "https://github.com/open-telemetry/opentelemetry-rust/", rev = "b096b70b2ffe9beb65a716cf47d5e5db80a9e930" }
4 changes: 4 additions & 0 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,10 @@ pub async fn mark_querier_available(domain_name: &str) {
}
}

#[tracing::instrument(
name = "send_query_request",
skip(auth_token, query_request, tenant_id)
)]
pub async fn send_query_request(
auth_token: Option<HeaderMap>,
query_request: &Query,
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub trait ParseableServer {
.wrap(prometheus.clone())
.configure(|config| Self::configure_routes(config))
.wrap(from_fn(health_check::check_shutdown_middleware))
.wrap(actix_web::middleware::Logger::default())
.wrap(tracing_actix_web::TracingLogger::default())
.wrap(actix_web::middleware::Compress::default())
.wrap(cross_origin_config())
};
Expand Down
43 changes: 32 additions & 11 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tokio::task::JoinSet;
use tracing::{error, warn};
use tracing::{Instrument, error, warn};

use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date};
Expand Down Expand Up @@ -79,6 +79,8 @@ pub struct Query {
/// A function to execute the query and fetch QueryResponse
/// This won't look in the cache
/// TODO: Improve this function and make this a part of the query API
#[tracing::instrument(name = "get_records_and_fields", skip(query_request, creds, tenant_id))]
#[allow(clippy::type_complexity)]
pub async fn get_records_and_fields(
query_request: &Query,
creds: &SessionKey,
Expand Down Expand Up @@ -115,6 +117,7 @@ pub async fn get_records_and_fields(
Ok((Some(records), Some(fields)))
}

#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming))]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not emit raw SQL text into request spans.

Line 120 records full query text (query.sql), which can leak sensitive literals/identifiers into logs and OTLP backends.

🔒 Safer span fields
-#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming))]
+#[tracing::instrument(
+    name = "query",
+    skip(req, query_request),
+    fields(
+        otel.kind = "server",
+        query.streaming = query_request.streaming,
+        query.length = query_request.query.len()
+    )
+)]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tracing::instrument(name = "query", skip(req, query_request), fields(otel.kind = "server", query.sql = %query_request.query, query.streaming = query_request.streaming))]
#[tracing::instrument(
name = "query",
skip(req, query_request),
fields(
otel.kind = "server",
query.streaming = query_request.streaming,
query.length = query_request.query.len()
)
)]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/query.rs` at line 120, The tracing span currently emits the
full SQL via the attribute on the "query" handler (the tracing::instrument on
the function using query_request), which can leak sensitive data; remove the raw
query text from the span fields and instead record a non-sensitive identifier
such as a hash/fingerprint or metadata (e.g., query_hash, param_count, or a
boolean for streaming) derived from query_request.query; update the tracing
attribute and any span field population to use that safe field (compute the
hash/fingerprint in the handler using query_request.query and expose only that
value in the span, or omit the SQL entirely and keep query.streaming if needed).

pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
let mut session_state = QUERY_SESSION.get_ctx().state();
let time_range =
Expand Down Expand Up @@ -179,6 +182,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
///
/// # Returns
/// - `HttpResponse` with the count result as JSON, including fields if requested.
#[tracing::instrument(name = "handle_count_query", skip(query_request, time), fields(table = %table_name))]
async fn handle_count_query(
query_request: &Query,
table_name: &str,
Expand Down Expand Up @@ -230,6 +234,10 @@ async fn handle_count_query(
///
/// # Returns
/// - `HttpResponse` with the full query result as a JSON object.
#[tracing::instrument(
name = "handle_non_streaming_query",
skip(query, query_request, time, table_name, tenant_id)
)]
async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand Down Expand Up @@ -283,6 +291,10 @@ async fn handle_non_streaming_query(
///
/// # Returns
/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array.
#[tracing::instrument(
name = "handle_streaming_query",
skip(query, query_request, time, table_name, tenant_id)
)]
async fn handle_streaming_query(
query: LogicalQuery,
table_name: Vec<String>,
Expand Down Expand Up @@ -367,6 +379,7 @@ fn create_batch_processor(
}
}

#[tracing::instrument(name = "get_counts", skip(req, counts_request), fields(otel.kind = "server"))]
pub async fn get_counts(
req: HttpRequest,
counts_request: Json<CountsRequest>,
Expand Down Expand Up @@ -453,6 +466,7 @@ pub async fn update_schema_when_distributed(
/// Create streams for querier if they do not exist
/// get list of streams from memory and storage
/// create streams for memory from storage if they do not exist
#[tracing::instrument(name = "create_streams_for_distributed", skip_all, fields(stream_count = streams.len()))]
pub async fn create_streams_for_distributed(
streams: Vec<String>,
tenant_id: &Option<String>,
Expand All @@ -461,19 +475,25 @@ pub async fn create_streams_for_distributed(
return Ok(());
}
let mut join_set = JoinSet::new();
let parent_span = tracing::Span::current();
for stream_name in streams {
let id = tenant_id.to_owned();
join_set.spawn(async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name, &id)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
let task_span =
tracing::info_span!(parent: &parent_span, "create_stream_task", stream = %stream_name);
join_set.spawn(
async move {
let result = PARSEABLE
.create_stream_and_schema_from_storage(&stream_name, &id)
.await;

if let Err(e) = &result {
warn!("Failed to create stream '{}': {}", stream_name, e);
}

(stream_name, result)
}

(stream_name, result)
});
.instrument(task_span),
);
}

while let Some(result) = join_set.join_next().await {
Expand Down Expand Up @@ -516,6 +536,7 @@ impl FromRequest for Query {
}
}

#[tracing::instrument(name = "into_query", skip(query, session_state, time_range))]
pub async fn into_query(
query: &Query,
session_state: &SessionState,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mod static_schema;
mod stats;
pub mod storage;
pub mod sync;
pub mod telemetry;
pub mod tenants;
pub mod users;
pub mod utils;
Expand Down
22 changes: 19 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ use std::process::exit;
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::SdkTracerProvider;
#[cfg(feature = "kafka")]
use parseable::connectors;
use parseable::{
IngestServer, ParseableServer, QueryServer, Server, banner, metrics, option::Mode,
parseable::PARSEABLE, rbac, storage,
parseable::PARSEABLE, rbac, storage, telemetry,
};
use tokio::signal::ctrl_c;
use tokio::sync::oneshot;
Expand All @@ -33,7 +35,7 @@ use tracing_subscriber::{EnvFilter, Registry, fmt};

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
init_logger();
let tracer_provider = init_logger();
// Install the rustls crypto provider before any TLS operations.
// This is required for rustls 0.23+ which needs an explicit crypto provider.
// If the installation fails, log a warning but continue execution.
Expand Down Expand Up @@ -95,10 +97,14 @@ async fn main() -> anyhow::Result<()> {
parseable_server.await?;
}

if let Some(provider) = tracer_provider {
let _ = provider.shutdown();
}

Ok(())
}

pub fn init_logger() {
pub fn init_logger() -> Option<SdkTracerProvider> {
let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
let default_level = if cfg!(debug_assertions) {
Level::DEBUG
Expand All @@ -116,10 +122,20 @@ pub fn init_logger() {
.with_target(true)
.compact();

let otel_provider = telemetry::init_otel_tracer();

let otel_layer = otel_provider.as_ref().map(|provider| {
let tracer = provider.tracer("parseable");
tracing_opentelemetry::layer().with_tracer(tracer)
});

Registry::default()
.with(filter_layer)
.with(fmt_layer)
.with(otel_layer)
.init();

otel_provider
}

#[cfg(windows)]
Expand Down
103 changes: 48 additions & 55 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use sysinfo::System;
use tokio::runtime::Runtime;
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use self::error::ExecuteError;
use self::stream_schema_provider::GlobalSchemaProvider;
Expand All @@ -76,6 +78,25 @@ use crate::utils::time::TimeRange;
// pub static QUERY_SESSION: Lazy<SessionContext> =
// Lazy::new(|| Query::create_session_context(PARSEABLE.storage()));

pub type RB = Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<RecordBatch, datafusion::error::DataFusionError>,
> + Send,
>,
>,
>,
>,
>,
>,
>;

pub static QUERY_SESSION_STATE: Lazy<SessionState> =
Lazy::new(|| Query::create_session_state(PARSEABLE.storage()));

Expand Down Expand Up @@ -133,40 +154,35 @@ impl InMemorySessionContext {

/// This function executes a query on the dedicated runtime, ensuring that the query is not isolated to a single thread/CPU
/// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results.
#[tracing::instrument(name = "query.execute", skip_all, fields(streaming = is_streaming))]
pub async fn execute(
query: Query,
is_streaming: bool,
tenant_id: &Option<String>,
) -> Result<
(
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
> {
) -> Result<(RB, Vec<String>), ExecuteError> {
let id = tenant_id.clone();

// W3C TraceContext propagation across QUERY_RUNTIME (separate OS-thread runtime).
// tracing::Span alone does NOT carry OTel context across OS threads.
let mut trace_ctx = std::collections::HashMap::new();
let cx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&cx, &mut trace_ctx);
});

QUERY_RUNTIME
.spawn(async move { query.execute(is_streaming, &id).await })
.spawn(async move {
// Extract the propagated context on the QUERY_RUNTIME thread
let parent_cx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&trace_ctx)
});
let span = tracing::info_span!("query.runtime_execute", streaming = is_streaming);
let _ = span.set_parent(parent_cx);

async move { query.execute(is_streaming, &id).await }
.instrument(span)
.await
})
.await
.expect("The Join should have been successful")
}
Expand Down Expand Up @@ -272,37 +288,12 @@ impl Query {
/// this function returns the result of the query
/// if streaming is true, it returns a stream
/// if streaming is false, it returns a vector of record batches
#[tracing::instrument(name = "query.datafusion_execute", skip_all, fields(streaming = is_streaming))]
pub async fn execute(
&self,
is_streaming: bool,
tenant_id: &Option<String>,
) -> Result<
(
Either<
Vec<RecordBatch>,
Pin<
Box<
RecordBatchStreamAdapter<
select_all::SelectAll<
Pin<
Box<
dyn RecordBatchStream<
Item = Result<
RecordBatch,
datafusion::error::DataFusionError,
>,
> + Send,
>,
>,
>,
>,
>,
>,
>,
Vec<String>,
),
ExecuteError,
> {
) -> Result<(RB, Vec<String>), ExecuteError> {
let df = QUERY_SESSION
.get_ctx()
.execute_logical_plan(self.final_logical_plan(tenant_id))
Expand Down Expand Up @@ -526,6 +517,7 @@ impl CountsRequest {
/// This function is supposed to read maninfest files for the given stream,
/// get the sum of `num_rows` between the `startTime` and `endTime`,
/// divide that by number of bins and return in a manner acceptable for the console
#[tracing::instrument(name = "get_bin_density", skip_all, fields(stream = %self.stream))]
pub async fn get_bin_density(
&self,
tenant_id: &Option<String>,
Expand Down Expand Up @@ -731,6 +723,7 @@ pub fn resolve_stream_names(sql: &str) -> Result<Vec<String>, anyhow::Error> {
Ok(tables)
}

#[tracing::instrument(name = "get_manifest_list", skip(time_range, tenant_id), fields(stream = %stream_name))]
pub async fn get_manifest_list(
stream_name: &str,
time_range: &TimeRange,
Expand Down
1 change: 1 addition & 0 deletions src/storage/field_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ pub struct QueryRow {
/// API handler to get the field stats for a dataset
/// If `fields` is empty, stats for all fields will be returned
/// If `fields` is provided, stats for those fields will be returned
#[tracing::instrument(name = "get_dataset_stats", skip(req, dataset_stats_request), fields(otel.kind = "server"))]
pub async fn get_dataset_stats(
req: HttpRequest,
dataset_stats_request: Json<DataSetStatsRequest>,
Expand Down
Loading
Loading