diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 1bdbacce26..4ce1d21e81 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -556,15 +556,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc0b364ead1874514c8c2855ab558056ebfeb775653e7ae45ff72f28f8f3166c" -[[package]] -name = "bstr" -version = "1.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" -dependencies = [ - "memchr", -] - [[package]] name = "bumpalo" version = "3.16.0" @@ -664,12 +655,6 @@ dependencies = [ "half", ] -[[package]] -name = "cityhash-rs" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" - [[package]] name = "clap" version = "4.5.4" @@ -710,44 +695,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" -[[package]] -name = "clickhouse" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a9a81a1dffadd762ee662635ce409232258ce9beebd7cc0fa227df0b5e7efc0" -dependencies = [ - "bstr", - "bytes", - "cityhash-rs", - "clickhouse-derive", - "futures", - "futures-channel", - "http-body-util", - "hyper 1.6.0", - "hyper-util", - "lz4_flex", - "replace_with", - "sealed", - "serde", - "static_assertions", - "thiserror 1.0.58", - "tokio", - "url", - "uuid", -] - -[[package]] -name = "clickhouse-derive" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d70f3e2893f7d3e017eeacdc9a708fbc29a10488e3ebca21f9df6a5d2b616dbb" -dependencies = [ - "proc-macro2", - "quote", - "serde_derive_internals 0.29.1", - "syn 2.0.117", -] - [[package]] name = "cmake" version = "0.1.57" @@ -2400,12 +2347,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "lz4_flex" -version = "0.11.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a" - [[package]] name = "matchers" version = "0.2.0" @@ -3354,12 +3295,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "replace_with" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884" - [[package]] name = "reqwest" version = "0.11.27" @@ -3449,7 +3384,6 @@ dependencies = [ "bytes", "cadence", "chrono", - "clickhouse", "criterion", "ctrlc", "data-encoding", @@ -3631,7 +3565,7 @@ checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" dependencies = [ "proc-macro2", "quote", - "serde_derive_internals 0.26.0", + "serde_derive_internals", "syn 1.0.109", ] @@ -3641,17 +3575,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sealed" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "security-framework" version = "2.10.0" @@ -3983,17 +3906,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "serde_derive_internals" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "serde_json" version = "1.0.149" @@ -4219,12 +4131,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "statsdproxy" version = "0.4.1" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 70e647016f..62518f4f3d 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -26,7 +26,6 @@ adler = "1.0.2" anyhow = { version = "1.0.69", features = ["backtrace"] } cadence = "1.0.0" chrono = { version = "0.4.26", features = ["serde"] } -clickhouse = { version = "0.13", features = ["uuid"] } ctrlc = { version = "3.2.5", features = ["termination"] } data-encoding = "2.5.0" futures = "0.3.21" diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 457924e37f..9f8d4824b1 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -26,18 +26,16 @@ use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; use crate::strategies::blq_router::BLQRouter; -use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep; -use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; +use crate::strategies::clickhouse::writer_v2::{ClickhouseWriterStep, InsertFormat}; use crate::strategies::commit_log::ProduceCommitLog; use crate::strategies::healthcheck::HealthCheck as SnubaHealthCheck; use crate::strategies::join_timeout::SetJoinTimeout; use crate::strategies::processor::{ - get_schema, make_rust_processor, make_rust_processor_row_binary, - make_rust_processor_with_replacements, validate_schema, + get_schema, make_rust_processor, make_rust_processor_with_replacements, validate_schema, }; use crate::strategies::python::PythonTransformStep; use crate::strategies::replacements::ProduceReplacements; -use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch}; +use crate::types::{BytesInsertBatch, CogsData, RowData}; pub struct ConsumerStrategyFactoryV2 { pub storage_config: config::StorageConfig, @@ -90,22 +88,31 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } fn create(&self) -> Box> { - if self.use_row_binary { - tracing::info!("Using row_binary pipeline"); - return match self + let (insert_format, process_fn_override, insert_columns): ( + InsertFormat, + Option, + Option<&'static [&'static str]>, + ) = if self.use_row_binary { + tracing::info!("Using RowBinary wire format"); + let processor_name = self .storage_config .message_processor .python_class_name - .as_str() - { - "EAPItemsProcessor" => self.create_row_binary_pipeline( + .as_str(); + let (func, columns): ( + crate::processors::ProcessingFunction, + &'static [&'static str], + ) = match processor_name { + "EAPItemsProcessor" => ( crate::processors::eap_items::process_message_row_binary, + crate::processors::eap_items::EAPItemRow::COLUMN_NAMES, ), name => panic!("RowBinary not supported for processor: {name}"), }; - } - - // ---- Existing JSONEachRow path (unchanged below this line) ---- + (InsertFormat::RowBinary, Some(func), Some(columns)) + } else { + (InsertFormat::JsonEachRow, None, None) + }; // Commit offsets let next_step = CommitOffsets::new(Duration::from_secs(1)); @@ -151,6 +158,8 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { false, &self.clickhouse_concurrency, self.storage_config.name.clone(), + insert_format, + insert_columns, ); let accumulator = Arc::new( @@ -186,9 +195,16 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { ) .flush_empty_batches(true); + // RowBinary can only be emitted by the Rust processor (the Python path + // always returns JSONEachRow bytes). If the storage opted into + // RowBinary, force the Rust processor branch — otherwise we'd POST + // JSON bytes under `FORMAT RowBinary` and ClickHouse would reject the + // batch. The previous early-return for RowBinary did this implicitly. + let use_rust_processor = self.use_rust_processor || self.use_row_binary; + // Transform messages let next_step = match ( - self.use_rust_processor, + use_rust_processor, processors::get_processing_function( &self.storage_config.message_processor.python_class_name, ), @@ -223,6 +239,10 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { ) } (true, Some(processors::ProcessingFunctionType::ProcessingFunction(func))) => { + // For storages opted into RowBinary, swap the registered JSON + // processor for its RowBinary sibling. Same signature, same + // pipeline; only the encoding inside the processor differs. + let func = process_fn_override.unwrap_or(func); make_rust_processor( next_step, func, @@ -308,151 +328,6 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } } -impl ConsumerStrategyFactoryV2 { - fn create_row_binary_pipeline< - T: clickhouse::Row - + serde::Serialize - + Clone - + Send - + Sync - + crate::types::EstimatedSize - + 'static, - >( - &self, - func: fn( - KafkaPayload, - crate::types::KafkaMessageMetadata, - &config::ProcessorConfig, - ) -> anyhow::Result>, - ) -> Box> { - // Commit offsets - let next_step = CommitOffsets::new(Duration::from_secs(1)); - - let next_step: Box>> = - if let Some((ref producer, destination)) = self.commit_log_producer { - Box::new(ProduceCommitLog::new( - next_step, - producer.clone(), - destination, - self.physical_topic_name, - self.physical_consumer_group.clone(), - &self.commitlog_concurrency, - false, - )) - } else { - Box::new(next_step) - }; - - let cogs_label = get_cogs_label(&self.storage_config.message_processor.python_class_name); - - let next_step: Box>> = - match (self.env_config.record_cogs, cogs_label) { - (true, Some(resource_id)) => Box::new(RecordCogs::new( - next_step, - resource_id, - self.accountant_topic_config.broker_config.clone(), - &self.accountant_topic_config.physical_topic_name, - )), - _ => Box::new(next_step), - }; - - let next_step = SetJoinTimeout::new( - next_step, - Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), - ); - - let next_step = ClickhouseRowBinaryWriterStep::::new( - next_step, - self.storage_config.clickhouse_cluster.clone(), - self.storage_config.clickhouse_table_name.clone(), - &self.clickhouse_concurrency, - self.storage_config.name.clone(), - ); - - let accumulator = Arc::new( - |batch: BytesInsertBatch>, small_batch: Message>>| { - Ok(batch.merge(small_batch.into_payload())) - }, - ); - - type BatchSizeFn = fn(&BytesInsertBatch>) -> usize; - let compute_batch_size: BatchSizeFn = match self.max_batch_size_calculation { - config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), - config::BatchSizeCalculation::Rows => |batch| batch.len(), - }; - - let next_step = Reduce::new( - next_step, - accumulator, - Arc::new(move || { - BytesInsertBatch::>::new( - Vec::new(), - None, - None, - None, - Default::default(), - CogsData::default(), - ) - }), - self.max_batch_size, - self.max_batch_time, - compute_batch_size, - ) - .flush_empty_batches(true); - - let next_step = make_rust_processor_row_binary( - next_step, - func, - &self.logical_topic_name, - self.enforce_schema, - &self.processing_concurrency, - config::ProcessorConfig { - env_config: self.env_config.clone(), - storage_name: self.storage_config.name.clone(), - }, - self.stop_at_timestamp, - ); - - let next_step = SetJoinTimeout::new( - next_step, - Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), - ); - - let next_step: Box> = - if let (Some(blq_producer_config), Some(blq_topic)) = - (&self.blq_producer_config, self.blq_topic) - { - tracing::info!( - "Routing stale messages to the backlog-queue topic {:?} \ - (thresholds configured via sentry-options)", - self.blq_topic, - ); - Box::new(BLQRouter::new( - next_step, - blq_producer_config.clone(), - blq_topic, - )) - } else { - tracing::info!("Not using a backlog-queue",); - Box::new(next_step) - }; - - if let Some(path) = &self.health_check_file { - if self.health_check == "snuba" { - tracing::info!( - "Using Snuba HealthCheck for consumer group: {}", - self.physical_consumer_group - ); - Box::new(SnubaHealthCheck::new(next_step, path)) - } else { - Box::new(HealthCheck::new(next_step, path)) - } - } else { - Box::new(next_step) - } - } -} - #[derive(Clone)] struct SchemaValidator { schema: Option>, diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index 3dc72ff750..7fd2bedde6 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -39,7 +39,7 @@ pub use metrics::statsd::StatsDBackend; pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS}; pub use strategies::noop::Noop; pub use strategies::python::PythonTransformStep; -pub use types::{EstimatedSize, KafkaMessageMetadata}; +pub use types::KafkaMessageMetadata; #[cfg(test)] mod testutils; diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index e2f5f09aa4..727f98ed3b 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -19,10 +19,9 @@ use crate::processors::utils::{ record_invalid_timestamp_metric, }; use crate::runtime_config::get_str_config; +use crate::strategies::clickhouse::rowbinary; use crate::types::CogsData; -use crate::types::{ - item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata, TypedInsertBatch, -}; +use crate::types::{item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; /// Runtime config key prefix. Per-storage key /// `eap_items_dlq_grace_period_min:`: a non-negative integer @@ -202,20 +201,58 @@ pub fn process_message_row_binary( msg: KafkaPayload, _metadata: KafkaMessageMetadata, config: &ProcessorConfig, -) -> anyhow::Result> { +) -> anyhow::Result { let processed = process_eap_item(msg, config)?; if processed.should_skip { - return Ok(TypedInsertBatch::from_rows(vec![], None)); + return Ok(InsertBatch::skip()); } - let mut batch = TypedInsertBatch::from_rows( - vec![EAPItemRow::try_from(processed.eap_item)?], - processed.origin_timestamp, - ); - batch.item_type_metrics = Some(processed.item_type_metrics); + let row = EAPItemRow::try_from(processed.eap_item)?; + + // Encode the row to RowBinary bytes inline so the wide typed struct (~80 + // Vec<(String, _)> buckets) drops here instead of riding the pipeline to + // the writer step. The batch downstream sees only a compact Vec. + let mut encoded_rows = Vec::new(); + rowbinary::serialize_into(&mut encoded_rows, &row)?; + + let mut batch = InsertBatch::from_encoded_rows(encoded_rows, 1, processed.origin_timestamp); batch.cogs_data = Some(processed.cogs_data); + batch.item_type_metrics = Some(processed.item_type_metrics); Ok(batch) } +/// Test-only: returns the typed `EAPItemRow` (plus the metadata fields the +/// pipeline carries) without the bytes serialization step. Tests that +/// inspect individual columns use this; the production path goes through +/// `process_message_row_binary` and never holds the typed struct beyond +/// `process_eap_item`. +#[cfg(test)] +pub(crate) struct EAPItemRowBatch { + pub rows: Vec, + pub cogs_data: Option, + pub item_type_metrics: Option, +} + +#[cfg(test)] +pub(crate) fn process_message_row_binary_typed( + msg: KafkaPayload, + _metadata: KafkaMessageMetadata, + config: &ProcessorConfig, +) -> anyhow::Result { + let processed = process_eap_item(msg, config)?; + if processed.should_skip { + return Ok(EAPItemRowBatch { + rows: vec![], + cogs_data: None, + item_type_metrics: None, + }); + } + Ok(EAPItemRowBatch { + rows: vec![EAPItemRow::try_from(processed.eap_item)?], + cogs_data: Some(processed.cogs_data), + item_type_metrics: Some(processed.item_type_metrics), + }) +} + #[derive(Debug, Default, Serialize)] struct EAPItem { organization_id: u64, @@ -421,13 +458,13 @@ impl AttributeMap { } seq_attrs! { -#[derive(Debug, Clone, clickhouse::Row, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct EAPItemRow { organization_id: u64, project_id: u64, item_type: u8, timestamp: u32, - #[serde(with = "clickhouse::serde::uuid")] + #[serde(with = "crate::strategies::clickhouse::rowbinary::uuid")] trace_id: Uuid, item_id: u128, @@ -451,28 +488,44 @@ pub struct EAPItemRow { } } -fn vec_string_pair_size(v: &[(String, B)]) -> usize { - let heap: usize = v.iter().map(|(s, _)| s.len()).sum(); - std::mem::size_of_val(v) + heap -} - -fn vec_string_string_pair_size(v: &[(String, String)]) -> usize { - let heap: usize = v.iter().map(|(k, v)| k.len() + v.len()).sum(); - std::mem::size_of_val(v) + heap -} - seq_attrs! { -impl crate::types::EstimatedSize for EAPItemRow { - fn estimated_size(&self) -> usize { - std::mem::size_of::() - + vec_string_pair_size(&self.attributes_bool) - + vec_string_pair_size(&self.attributes_int) - + self.attributes_array.len() - #( - + vec_string_string_pair_size(&self.attributes_string_~N) - + vec_string_pair_size(&self.attributes_float_~N) - )* - } +impl EAPItemRow { + /// Column names in struct (= wire) order. We MUST pass this list to + /// ClickHouse on insert (`INSERT INTO t (col1, col2, ...) FORMAT RowBinary`) + /// because the on-disk column order in `eap_items_1_local` differs from + /// the struct order: + /// + /// * `client_sample_rate` and `server_sample_rate` were added with + /// identical `AFTER sampling_factor` in migration 0048, so the table + /// ends up with the pair reversed (server before client). + /// * The struct interleaves `attributes_string_N, attributes_float_N` for + /// each `N` (per the `seq_attrs!` expansion), while the initial table + /// put all `attributes_string_*` first, then all `attributes_float_*`. + /// + /// Without an explicit column list, ClickHouse falls back to the table's + /// positional order and misreads bytes (the integration test hits a + /// `CANNOT_READ_ALL_DATA` deep inside the maps section). + pub(crate) const COLUMN_NAMES: &'static [&'static str] = &[ + "organization_id", + "project_id", + "item_type", + "timestamp", + "trace_id", + "item_id", + "sampling_weight", + "sampling_factor", + "client_sample_rate", + "server_sample_rate", + "retention_days", + "downsampled_retention_days", + "attributes_bool", + "attributes_int", + #( + concat!("attributes_string_", stringify!(N)), + concat!("attributes_float_", stringify!(N)), + )* + "attributes_array", + ]; } } @@ -794,7 +847,7 @@ mod tests { ); // RowBinary path - let rb_batch = process_message_row_binary( + let rb_batch = process_message_row_binary_typed( KafkaPayload::new(None, None, Some(payload_bytes)), meta, &ProcessorConfig::default(), @@ -844,7 +897,7 @@ mod tests { .contains_key("sentry._internal.received_at")); // RowBinary path - let rb_batch = process_message_row_binary( + let rb_batch = process_message_row_binary_typed( KafkaPayload::new(None, None, Some(payload_bytes)), meta, &ProcessorConfig::default(), @@ -916,6 +969,30 @@ mod tests { assert!(!should_dlq_for_prior_partition(event_ts, now, 45)); } + /// The column list we ship with `INSERT INTO ... FORMAT RowBinary` must + /// match the struct's field order exactly — otherwise ClickHouse misreads + /// bytes (e.g., a `Map(String, String)` worth of data lands in a column + /// declared `Map(String, Float64)`). Lock the order down here so anyone + /// adding/reordering fields on `EAPItemRow` also updates this list. + #[test] + fn test_column_names_match_struct_layout() { + let names = EAPItemRow::COLUMN_NAMES; + // 12 scalars + attributes_bool + attributes_int + 80 buckets + attributes_array + assert_eq!(names.len(), 95); + assert_eq!(names[0], "organization_id"); + assert_eq!(names[7], "sampling_factor"); + assert_eq!(names[8], "client_sample_rate"); + assert_eq!(names[9], "server_sample_rate"); + // Bucket pairs are interleaved (string_N then float_N) per the + // `seq_attrs!` expansion on EAPItemRow. + assert_eq!(names[14], "attributes_string_0"); + assert_eq!(names[15], "attributes_float_0"); + assert_eq!(names[16], "attributes_string_1"); + assert_eq!(names[92], "attributes_string_39"); + assert_eq!(names[93], "attributes_float_39"); + assert_eq!(names[94], "attributes_array"); + } + #[test] fn test_should_not_dlq_event_at_boundary() { // event_ts == boundary belongs to the new week — must not DLQ. @@ -963,7 +1040,7 @@ mod tests { offset: 1, timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); assert_eq!(batch.rows.len(), 1); @@ -991,7 +1068,7 @@ mod tests { offset: 1, timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1013,7 +1090,7 @@ mod tests { offset: 1, timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1037,7 +1114,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); assert!(batch.item_type_metrics.is_some()); @@ -1066,7 +1143,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); assert!(batch.cogs_data.is_some()); @@ -1112,7 +1189,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1151,7 +1228,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1229,8 +1306,9 @@ mod tests { // Process through RowBinary path let rb_payload = KafkaPayload::new(None, None, Some(payload_bytes)); - let rb_batch = process_message_row_binary(rb_payload, meta, &ProcessorConfig::default()) - .expect("RowBinary path should succeed"); + let rb_batch = + process_message_row_binary_typed(rb_payload, meta, &ProcessorConfig::default()) + .expect("RowBinary path should succeed"); // Parse the JSON output let json_str = @@ -1435,6 +1513,12 @@ mod tests { ); } + /// End-to-end test of the production RowBinary path against a live + /// ClickHouse instance: process_message_row_binary produces bytes via the + /// vendored serializer, those bytes are POSTed verbatim with + /// `FORMAT RowBinary`, and the row is read back via `FORMAT JSON`. This + /// is the cross-boundary check that our wire format matches what + /// ClickHouse expects — pure unit tests can't catch that. #[tokio::test] async fn test_row_binary_clickhouse_insert() { let host = std::env::var("CLICKHOUSE_HOST").unwrap_or("127.0.0.1".to_string()); @@ -1443,12 +1527,9 @@ mod tests { .parse() .unwrap(); let database = std::env::var("CLICKHOUSE_DATABASE").unwrap_or("default".to_string()); + let base_url = format!("http://{host}:{http_port}"); - let client = clickhouse::Client::default() - .with_url(format!("http://{host}:{http_port}")) - .with_database(&database) - .with_option("input_format_binary_read_json_as_string", "1") - .with_option("insert_deduplicate", "0"); + let http = reqwest::Client::new(); // Use a unique organization_id to avoid conflicts with other test data let unique_org_id: u64 = 999_999_000 + (rand::random::() % 1000) as u64; @@ -1495,56 +1576,107 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; + // Production path: encodes the row to RowBinary bytes inside the processor. let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); - - assert_eq!(batch.rows.len(), 1); - let row = batch.rows[0].clone(); - - // Insert via RowBinary (same code path as production) - let mut insert = client - .insert("eap_items_1_local") - .expect("Failed to create insert"); - insert.write(&row).await.expect("Failed to write row"); - insert.end().await.expect("Failed to end insert"); - - // Read it back using organization_id (primary key prefix) for reliable lookup - let count: u64 = client - .query(&format!( - "SELECT count() FROM eap_items_1_local WHERE organization_id = {unique_org_id}" - )) - .fetch_one() + assert_eq!(batch.rows.num_rows, 1); + + // Insert: POST the pre-encoded bytes with FORMAT RowBinary. We must + // pass the column list — the struct's wire order does NOT match the + // table's on-disk column order (see EAPItemRow::COLUMN_NAMES). + let insert_query = format!( + "INSERT INTO eap_items_1_local ({}) FORMAT RowBinary", + EAPItemRow::COLUMN_NAMES.join(", "), + ); + let insert_resp = http + .post(&base_url) + .header("X-ClickHouse-Database", &database) + .query(&[ + ("query", insert_query.as_str()), + ("input_format_binary_read_json_as_string", "1"), + ("insert_deduplicate", "0"), + ]) + .body(batch.rows.encoded_rows.clone()) + .send() .await - .expect("Failed to count rows"); + .expect("Insert request failed to send"); assert!( - count > 0, - "No rows found after insert for org_id={unique_org_id}" + insert_resp.status().is_success(), + "Insert failed: {}", + insert_resp.text().await.unwrap_or_default() ); - let result = client - .query(&format!( - "SELECT organization_id, project_id, item_type, sampling_weight \ - FROM eap_items_1_local \ - WHERE organization_id = {unique_org_id} \ - LIMIT 1" - )) - .fetch_one::<(u64, u64, u8, u64)>() - .await - .expect("Failed to read back inserted row"); - - assert_eq!(result.0, unique_org_id); // organization_id - assert_eq!(result.1, 1); // project_id - assert_eq!(result.2, TraceItemType::Span as u8); // item_type - assert_eq!(result.3, 1); // sampling_weight - - // Clean up - client - .query(&format!( - "ALTER TABLE eap_items_1_local DELETE WHERE organization_id = {unique_org_id}" - )) - .execute() + // Read it back via FORMAT JSON. We use organization_id (primary key prefix) + // for a deterministic lookup. ClickHouse's FORMAT JSON renders 64-bit + // ints as strings to avoid JS-precision loss, so we parse them back. + // + // GET (not POST) for the read-back: ClickHouse 25.x rejects bodyless + // POSTs with HTTP 411 because reqwest doesn't emit a Content-Length + // header by default when there's nothing to send. + let select_resp = http + .get(&base_url) + .header("X-ClickHouse-Database", &database) + .query(&[( + "query", + format!( + "SELECT organization_id, project_id, item_type, sampling_weight \ + FROM eap_items_1_local \ + WHERE organization_id = {unique_org_id} \ + LIMIT 1 FORMAT JSON" + ), + )]) + .send() .await - .ok(); + .expect("Select request failed to send"); + let select_status = select_resp.status(); + let body_text = select_resp.text().await.expect("response body"); + assert!( + select_status.is_success(), + "Select failed: status={select_status}, body={body_text}" + ); + let body: serde_json::Value = serde_json::from_str(&body_text).expect("JSON response"); + let data = body["data"].as_array().expect("data array"); + assert_eq!(data.len(), 1, "no rows found for org_id={unique_org_id}"); + let row = &data[0]; + assert_eq!( + row["organization_id"] + .as_str() + .unwrap() + .parse::() + .unwrap(), + unique_org_id + ); + assert_eq!( + row["project_id"].as_str().unwrap().parse::().unwrap(), + 1 + ); + assert_eq!( + row["item_type"].as_u64().unwrap() as u8, + TraceItemType::Span as u8 + ); + assert_eq!( + row["sampling_weight"] + .as_str() + .unwrap() + .parse::() + .unwrap(), + 1 + ); + + // Clean up. POST with an empty body (rather than no body) so reqwest + // emits Content-Length: 0 and ClickHouse 25.x doesn't reject with 411. + let _ = http + .post(&base_url) + .header("X-ClickHouse-Database", &database) + .query(&[( + "query", + format!( + "ALTER TABLE eap_items_1_local DELETE WHERE organization_id = {unique_org_id}" + ), + )]) + .body("") + .send() + .await; } #[test] diff --git a/rust_snuba/src/strategies/clickhouse/mod.rs b/rust_snuba/src/strategies/clickhouse/mod.rs index 29f0fd60ce..45a3d85276 100644 --- a/rust_snuba/src/strategies/clickhouse/mod.rs +++ b/rust_snuba/src/strategies/clickhouse/mod.rs @@ -1,2 +1,2 @@ -pub mod row_binary_writer; +pub mod rowbinary; pub mod writer_v2; diff --git a/rust_snuba/src/strategies/clickhouse/row_binary_writer.rs b/rust_snuba/src/strategies/clickhouse/row_binary_writer.rs deleted file mode 100644 index 35c00b32c3..0000000000 --- a/rust_snuba/src/strategies/clickhouse/row_binary_writer.rs +++ /dev/null @@ -1,211 +0,0 @@ -use std::time::{Duration, SystemTime}; - -use sentry_arroyo::processing::strategies::run_task_in_threads::{ - ConcurrencyConfig, RunTaskError, RunTaskFunc, RunTaskInThreads, TaskRunner, -}; -use sentry_arroyo::processing::strategies::{ - CommitRequest, ProcessingStrategy, StrategyError, SubmitError, -}; -use sentry_arroyo::types::Message; -use sentry_arroyo::{counter, timer}; - -use crate::config::ClickhouseConfig; -use crate::runtime_config::get_load_balancing_config; -use crate::types::BytesInsertBatch; - -struct RowBinaryTaskRunner { - client: clickhouse::Client, - storage_name: String, - table: String, - _phantom: std::marker::PhantomData, -} - -impl Clone for RowBinaryTaskRunner { - fn clone(&self) -> Self { - Self { - client: self.client.clone(), - storage_name: self.storage_name.clone(), - table: self.table.clone(), - _phantom: std::marker::PhantomData, - } - } -} - -impl RowBinaryTaskRunner -where - // Send + Sync + 'static required because T is used in async tasks dispatched - // across threads via RunTaskInThreads. - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, -{ - fn new(config: &ClickhouseConfig, table: String, storage_name: String) -> Self { - let scheme = if config.secure { "https" } else { "http" }; - let client = clickhouse::Client::default() - .with_url(format!("{}://{}:{}", scheme, config.host, config.http_port)) - .with_user(&config.user) - .with_password(&config.password) - .with_database(&config.database) - // Wait for data to be written to all shards before returning success. - // Without this, inserts return immediately after writing to the local - // node and data is forwarded asynchronously, risking data loss on failure. - .with_option("insert_distributed_sync", "1") - .with_option("input_format_binary_read_json_as_string", "1"); - - Self { - client, - storage_name, - table, - _phantom: std::marker::PhantomData, - } - } -} - -const INITIAL_BACKOFF_MS: f64 = 500.0; -const MAX_RETRIES: usize = 4; -const JITTER_FACTOR: f64 = 0.2; - -impl TaskRunner>, BytesInsertBatch<()>, anyhow::Error> - for RowBinaryTaskRunner -where - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, -{ - fn get_task( - &self, - message: Message>>, - ) -> RunTaskFunc, anyhow::Error> { - let client = self.client.clone(); - let table = self.table.clone(); - let lb_config = get_load_balancing_config(&self.storage_name); - - Box::pin(async move { - let (empty_message, insert_batch) = message.take(); - let batch_len = insert_batch.len(); - let num_bytes = insert_batch.num_bytes(); - let (rows, empty_batch) = insert_batch.take(); - - let write_start = SystemTime::now(); - - if rows.is_empty() { - tracing::debug!("skipping write of empty payload ({} rows)", batch_len); - } else { - tracing::debug!("performing row binary write"); - - for attempt in 0..=MAX_RETRIES { - match write_rows(&client, &table, &rows, &lb_config).await { - Ok(()) => break, - Err(e) => { - if attempt == MAX_RETRIES { - counter!("rust_consumer.clickhouse_insert_error", 1, "status" => "row_binary_error", "retried" => "false"); - return Err(RunTaskError::Other(anyhow::anyhow!( - "error writing to clickhouse via RowBinary after {} attempts: {}", - MAX_RETRIES + 1, - e - ))); - } - counter!("rust_consumer.clickhouse_insert_error", 1, "status" => "row_binary_error", "retried" => "true"); - tracing::warn!( - "ClickHouse RowBinary write failed (attempt {}/{}): {}", - attempt + 1, - MAX_RETRIES + 1, - e - ); - - let backoff_ms = - INITIAL_BACKOFF_MS * (2_u64.pow(attempt as u32) as f64); - let jitter = - rand::random::() * JITTER_FACTOR - JITTER_FACTOR / 2.0; - let delay = - Duration::from_millis((backoff_ms * (1.0 + jitter)).round() as u64); - tokio::time::sleep(delay).await; - } - } - } - - tracing::info!("Inserted {} rows via RowBinary", batch_len); - let write_finish = SystemTime::now(); - if let Ok(elapsed) = write_finish.duration_since(write_start) { - timer!("insertions.batch_write_ms", elapsed); - } - } - - counter!("insertions.batch_write_bytes", num_bytes as i64); - counter!("insertions.batch_write_msgs", batch_len as i64); - empty_batch.record_message_latency(); - empty_batch.emit_item_type_metrics(); - - Ok(empty_message.replace(empty_batch)) - }) - } -} - -async fn write_rows( - client: &clickhouse::Client, - table: &str, - rows: &[T], - lb_config: &crate::runtime_config::LoadBalancingConfig, -) -> Result<(), clickhouse::error::Error> { - let mut insert = client - .insert(table)? - .with_option("load_balancing", &lb_config.load_balancing); - if let Some(offset) = &lb_config.first_offset { - insert = insert.with_option("load_balancing_first_offset", offset); - } - for row in rows { - insert.write(row).await?; - } - insert.end().await?; - Ok(()) -} - -pub struct ClickhouseRowBinaryWriterStep { - inner: RunTaskInThreads>, BytesInsertBatch<()>, anyhow::Error, N>, -} - -impl ClickhouseRowBinaryWriterStep -where - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, - N: ProcessingStrategy> + 'static, -{ - pub fn new( - next_step: N, - cluster_config: ClickhouseConfig, - table: String, - concurrency: &ConcurrencyConfig, - storage_name: String, - ) -> Self { - let task_runner = RowBinaryTaskRunner::::new(&cluster_config, table, storage_name); - - let inner = RunTaskInThreads::new( - next_step, - task_runner, - concurrency, - Some("clickhouse_row_binary"), - ); - - ClickhouseRowBinaryWriterStep { inner } - } -} - -impl ProcessingStrategy>> for ClickhouseRowBinaryWriterStep -where - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, - N: ProcessingStrategy>, -{ - fn poll(&mut self) -> Result, StrategyError> { - self.inner.poll() - } - - fn submit( - &mut self, - message: Message>>, - ) -> Result<(), SubmitError>>> { - self.inner.submit(message) - } - - fn terminate(&mut self) { - self.inner.terminate(); - } - - fn join(&mut self, timeout: Option) -> Result, StrategyError> { - self.inner.join(timeout) - } -} diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs new file mode 100644 index 0000000000..880bf3aafd --- /dev/null +++ b/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs @@ -0,0 +1,36 @@ +//! Minimal RowBinary serializer for inserts into ClickHouse. +//! +//! Vendored from the `clickhouse` crate (Apache-2.0 / MIT) and reduced to the +//! shape `rust_snuba` actually emits: fixed-width integers, f32/f64, bool, str, +//! raw byte slices (UUID / FixedString), length-prefixed sequences, tuples and +//! structs. We do this in-process so the typed rows can be serialized to bytes +//! inside the processor and dropped immediately, instead of riding the pipeline +//! all the way to the writer step. + +mod ser; + +// `Error` is part of the module's public surface (it's the error type of +// `serialize_into`). It isn't referenced by name anywhere in the crate today +// — callers propagate via `?` into `anyhow::Error` — but keep the re-export +// so consumers can match on the variant if they ever need to. +#[allow(unused_imports)] +pub use ser::{serialize_into, Error}; + +pub mod uuid { + //! Serde adapter for `#[serde(with = "...")]` on `Uuid` fields. ClickHouse + //! stores UUID as two little-endian u64 halves, so the canonical 16-byte + //! UUID has each half reversed before being written. + + use serde::Serializer; + use uuid::Uuid; + + pub fn serialize(uuid: &Uuid, serializer: S) -> Result + where + S: Serializer, + { + let mut bytes = *uuid.as_bytes(); + bytes[..8].reverse(); + bytes[8..].reverse(); + serializer.serialize_bytes(&bytes) + } +} diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs new file mode 100644 index 0000000000..e17fab74d7 --- /dev/null +++ b/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs @@ -0,0 +1,418 @@ +use std::fmt; + +use serde::ser::{ + self, Impossible, Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple, + SerializeTupleStruct, +}; + +/// Append the RowBinary encoding of `value` to `buf`. +pub fn serialize_into(buf: &mut Vec, value: &T) -> Result<(), Error> +where + T: Serialize + ?Sized, +{ + let mut serializer = Serializer { buf }; + value.serialize(&mut serializer) +} + +#[derive(Debug)] +pub enum Error { + Message(String), + /// Sequences and maps must report a known length so we can emit the + /// LEB128 length prefix up front. + SequenceLengthRequired, + Unsupported(&'static str), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Message(s) => f.write_str(s), + Error::SequenceLengthRequired => { + f.write_str("rowbinary: sequences and maps must have a known length") + } + Error::Unsupported(what) => write!(f, "rowbinary: unsupported serde type: {what}"), + } + } +} + +impl std::error::Error for Error {} + +impl ser::Error for Error { + fn custom(msg: T) -> Self { + Error::Message(msg.to_string()) + } +} + +struct Serializer<'a> { + buf: &'a mut Vec, +} + +fn write_uvarint(buf: &mut Vec, mut v: u64) { + while v >= 0x80 { + buf.push((v as u8) | 0x80); + v >>= 7; + } + buf.push(v as u8); +} + +macro_rules! impl_le { + ($($name:ident => $ty:ty),+ $(,)?) => {$( + #[inline] + fn $name(self, v: $ty) -> Result<(), Error> { + self.buf.extend_from_slice(&v.to_le_bytes()); + Ok(()) + } + )+}; +} + +impl<'a, 'b> ser::Serializer for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + + type SerializeSeq = Self; + type SerializeTuple = Self; + type SerializeTupleStruct = Self; + type SerializeMap = Self; + type SerializeStruct = Self; + type SerializeTupleVariant = Impossible<(), Error>; + type SerializeStructVariant = Impossible<(), Error>; + + #[inline] + fn serialize_bool(self, v: bool) -> Result<(), Error> { + self.buf.push(v as u8); + Ok(()) + } + + #[inline] + fn serialize_u8(self, v: u8) -> Result<(), Error> { + self.buf.push(v); + Ok(()) + } + + #[inline] + fn serialize_i8(self, v: i8) -> Result<(), Error> { + self.buf.push(v as u8); + Ok(()) + } + + impl_le! { + serialize_i16 => i16, + serialize_i32 => i32, + serialize_i64 => i64, + serialize_i128 => i128, + serialize_u16 => u16, + serialize_u32 => u32, + serialize_u64 => u64, + serialize_u128 => u128, + serialize_f32 => f32, + serialize_f64 => f64, + } + + fn serialize_char(self, v: char) -> Result<(), Error> { + let mut tmp = [0u8; 4]; + self.serialize_str(v.encode_utf8(&mut tmp)) + } + + fn serialize_str(self, v: &str) -> Result<(), Error> { + write_uvarint(self.buf, v.len() as u64); + self.buf.extend_from_slice(v.as_bytes()); + Ok(()) + } + + /// Raw byte write — no length prefix. Matches RowBinary's fixed-width + /// encoding for UUID and FixedString(N). Variable-length byte sequences + /// should be modeled as `Vec` and hit `serialize_seq`. + fn serialize_bytes(self, v: &[u8]) -> Result<(), Error> { + self.buf.extend_from_slice(v); + Ok(()) + } + + fn serialize_none(self) -> Result<(), Error> { + self.buf.push(1); + Ok(()) + } + + fn serialize_some(self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + self.buf.push(0); + value.serialize(self) + } + + fn serialize_unit(self) -> Result<(), Error> { + Ok(()) + } + + fn serialize_unit_struct(self, _name: &'static str) -> Result<(), Error> { + Ok(()) + } + + fn serialize_unit_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + ) -> Result<(), Error> { + Err(Error::Unsupported("enum variant")) + } + + fn serialize_newtype_struct(self, _name: &'static str, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(self) + } + + fn serialize_newtype_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + _value: &T, + ) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + Err(Error::Unsupported("enum variant")) + } + + fn serialize_seq(self, len: Option) -> Result { + let len = len.ok_or(Error::SequenceLengthRequired)?; + write_uvarint(self.buf, len as u64); + Ok(self) + } + + fn serialize_tuple(self, _len: usize) -> Result { + Ok(self) + } + + fn serialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_tuple_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Err(Error::Unsupported("enum variant")) + } + + fn serialize_map(self, len: Option) -> Result { + let len = len.ok_or(Error::SequenceLengthRequired)?; + write_uvarint(self.buf, len as u64); + Ok(self) + } + + fn serialize_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_struct_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Err(Error::Unsupported("enum variant")) + } +} + +impl<'a, 'b> SerializeSeq for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_element(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeTuple for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_element(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeTupleStruct for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_field(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeMap for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_key(&mut self, key: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + key.serialize(&mut **self) + } + fn serialize_value(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeStruct for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Serialize; + + fn encode(v: &T) -> Vec { + let mut buf = Vec::new(); + serialize_into(&mut buf, v).unwrap(); + buf + } + + #[test] + fn primitives_are_little_endian() { + assert_eq!(encode(&0x01u8), vec![0x01]); + assert_eq!(encode(&0x0102u16), vec![0x02, 0x01]); + assert_eq!(encode(&0x01020304u32), vec![0x04, 0x03, 0x02, 0x01]); + assert_eq!( + encode(&0x0102030405060708u64), + vec![0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01], + ); + assert_eq!(encode(&true), vec![1]); + assert_eq!(encode(&false), vec![0]); + } + + #[test] + fn strings_get_uvarint_length_prefix() { + assert_eq!(encode(&""), vec![0]); + assert_eq!(encode(&"abc"), vec![3, b'a', b'b', b'c']); + // 130-char string crosses the single-byte LEB128 boundary (0x80). + let s = "x".repeat(130); + let out = encode(&s); + assert_eq!(&out[..2], &[0x82, 0x01]); // 130 = 0b1_0000010 → 0x82 0x01 + assert_eq!(out.len(), 132); + } + + #[test] + fn bytes_are_raw_no_prefix() { + // serialize_bytes is the FixedString path — no length prefix. + // (We use this for UUIDs.) + struct Raw(Vec); + impl Serialize for Raw { + fn serialize(&self, s: S) -> Result { + s.serialize_bytes(&self.0) + } + } + let out = encode(&Raw(vec![0xaa, 0xbb, 0xcc])); + assert_eq!(out, vec![0xaa, 0xbb, 0xcc]); + } + + #[test] + fn sequences_prefix_then_elements() { + let v: Vec = vec![1, 2, 3]; + // 3 elements, then each u16 LE. + assert_eq!(encode(&v), vec![3, 1, 0, 2, 0, 3, 0]); + } + + #[test] + fn map_like_vec_of_pairs_matches_clickhouse_map_shape() { + // Map(String, UInt8) is wire-equivalent to Array(Tuple(String, UInt8)): + // uvarint length, then concatenated (string-prefix, key, u8) tuples. + let v: Vec<(String, u8)> = vec![("a".into(), 7), ("bb".into(), 9)]; + assert_eq!( + encode(&v), + vec![ + 2, // len + 1, b'a', 7, // ("a", 7) + 2, b'b', b'b', 9, // ("bb", 9) + ], + ); + } + + #[test] + fn uuid_adapter_emits_16_bytes_with_halves_reversed() { + use uuid::Uuid; + // 11223344-5566-7788-99aa-bbccddeeff00 + let id = Uuid::from_bytes([ + 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, + 0xff, 0x00, + ]); + #[derive(Serialize)] + struct W { + #[serde(with = "super::super::uuid")] + id: Uuid, + } + let out = encode(&W { id }); + assert_eq!( + out, + vec![ + 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, // first half reversed + 0x00, 0xff, 0xee, 0xdd, 0xcc, 0xbb, 0xaa, 0x99, // second half reversed + ], + ); + } + + #[test] + fn struct_fields_serialize_in_declaration_order() { + #[derive(Serialize)] + struct S { + a: u8, + b: u16, + c: bool, + } + let out = encode(&S { + a: 1, + b: 2, + c: true, + }); + assert_eq!(out, vec![1, 2, 0, 1]); + } +} diff --git a/rust_snuba/src/strategies/clickhouse/writer_v2.rs b/rust_snuba/src/strategies/clickhouse/writer_v2.rs index 3f48d4e97c..14cd0e0055 100644 --- a/rust_snuba/src/strategies/clickhouse/writer_v2.rs +++ b/rust_snuba/src/strategies/clickhouse/writer_v2.rs @@ -74,10 +74,34 @@ pub struct ClickhouseWriterStep { inner: RunTaskInThreads, BytesInsertBatch<()>, anyhow::Error, N>, } +/// Wire format the writer step posts to ClickHouse with. Picking the right +/// value here is the only thing the consumer needs to know — the pipeline +/// shape, batching, and retry behavior are identical across formats. +#[derive(Clone, Copy, Debug)] +pub enum InsertFormat { + JsonEachRow, + RowBinary, +} + +impl InsertFormat { + fn as_str(self) -> &'static str { + match self { + InsertFormat::JsonEachRow => "JSONEachRow", + InsertFormat::RowBinary => "RowBinary", + } + } +} + impl ClickhouseWriterStep where N: ProcessingStrategy> + 'static, { + /// `columns`: if `Some`, expand into the SQL as + /// `INSERT INTO {table} (col1, col2, ...) FORMAT ...`. Required for + /// `RowBinary`, which would otherwise fall back to the table's positional + /// column order — a footgun whenever wire order and table order diverge. + /// For `JSONEachRow`, pass `None` to preserve historical behavior. + #[allow(clippy::too_many_arguments)] pub fn new( next_step: N, cluster_config: ClickhouseConfig, @@ -85,6 +109,8 @@ where skip_write: bool, concurrency: &ConcurrencyConfig, storage_name: String, + format: InsertFormat, + columns: Option<&'static [&'static str]>, ) -> Self { let inner = RunTaskInThreads::new( next_step, @@ -93,6 +119,8 @@ where &cluster_config.clone(), &table, storage_name, + format, + columns, )), skip_write, ), @@ -154,7 +182,13 @@ pub struct ClickhouseClient { } impl ClickhouseClient { - pub fn new(config: &ClickhouseConfig, table: &str, storage_name: String) -> ClickhouseClient { + pub fn new( + config: &ClickhouseConfig, + table: &str, + storage_name: String, + format: InsertFormat, + columns: Option<&[&str]>, + ) -> ClickhouseClient { let mut headers = HeaderMap::with_capacity(6); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate")); @@ -175,8 +209,20 @@ impl ClickhouseClient { let host = &config.host; let port = &config.http_port; - let base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1"); - let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); + let mut base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1"); + if matches!(format, InsertFormat::RowBinary) { + // RowBinary cannot represent JSON values natively; tell ClickHouse + // to treat any binary string targeting a JSON column as JSON text. + base_url.push_str("&input_format_binary_read_json_as_string=1"); + } + let columns_clause = match columns { + Some(cols) => format!(" ({})", cols.join(", ")), + None => String::new(), + }; + let query = format!( + "INSERT INTO {table}{columns_clause} FORMAT {fmt}", + fmt = format.as_str(), + ); ClickhouseClient { client: Client::new(), @@ -322,7 +368,13 @@ mod tests { crate::testutils::initialize_python(); let config = make_test_config(); println!("config: {config:?}"); - let client = ClickhouseClient::new(&config, "querylog_local", "test_storage".to_string()); + let client = ClickhouseClient::new( + &config, + "querylog_local", + "test_storage".to_string(), + InsertFormat::JsonEachRow, + None, + ); let url = client.build_url(); assert!(url.contains("load_balancing=in_order")); @@ -337,7 +389,13 @@ mod tests { fn test_url_with_runtime_config_override() { crate::testutils::initialize_python(); let config = make_test_config(); - let client = ClickhouseClient::new(&config, "test_table", "writer_v2_lb_test".to_string()); + let client = ClickhouseClient::new( + &config, + "test_table", + "writer_v2_lb_test".to_string(), + InsertFormat::JsonEachRow, + None, + ); // Default: in_order let url = client.build_url(); @@ -367,6 +425,8 @@ mod tests { &config, "test_table", "writer_v2_block_size_test".to_string(), + InsertFormat::JsonEachRow, + None, ); // Default (key absent): no suffix. @@ -382,8 +442,13 @@ mod tests { assert!(url.contains("&max_insert_block_size=2000000")); // A different storage isn't affected. - let other_client = - ClickhouseClient::new(&config, "test_table", "writer_v2_other_storage".to_string()); + let other_client = ClickhouseClient::new( + &config, + "test_table", + "writer_v2_other_storage".to_string(), + InsertFormat::JsonEachRow, + None, + ); let url = other_client.build_url(); assert!(!url.contains("max_insert_block_size")); @@ -419,7 +484,13 @@ mod tests { database: "default".to_string(), }; - let client = ClickhouseClient::new(&config, "test_table", "test_storage".to_string()); + let client = ClickhouseClient::new( + &config, + "test_table", + "test_storage".to_string(), + InsertFormat::JsonEachRow, + None, + ); let start_time = Instant::now(); let result = client diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index d3f055912c..29059e08c3 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -16,8 +16,8 @@ use sentry_kafka_schemas::{Schema, SchemaError, SchemaType}; use crate::config::ProcessorConfig; use crate::processors::{ProcessingFunction, ProcessingFunctionWithReplacements}; use crate::types::{ - BytesInsertBatch, CommitLogEntry, CommitLogOffsets, EstimatedSize, InsertBatch, - InsertOrReplacement, KafkaMessageMetadata, RowData, TypedInsertBatch, + BytesInsertBatch, CommitLogEntry, CommitLogOffsets, InsertBatch, InsertOrReplacement, + KafkaMessageMetadata, RowData, }; use tokio::time::Instant; @@ -179,86 +179,6 @@ pub fn make_rust_processor_with_replacements( )) } -pub fn make_rust_processor_row_binary( - next_step: impl ProcessingStrategy>> + 'static, - func: fn( - KafkaPayload, - KafkaMessageMetadata, - &ProcessorConfig, - ) -> anyhow::Result>, - schema_name: &str, - enforce_schema: bool, - concurrency: &ConcurrencyConfig, - processor_config: ProcessorConfig, - stop_at_timestamp: Option, -) -> Box> { - let schema = get_schema(schema_name, enforce_schema); - - fn result_to_next_msg( - transformed: TypedInsertBatch, - partition: Partition, - offset: u64, - timestamp: DateTime, - stop_at_timestamp: Option, - ) -> anyhow::Result>>> { - // If a stop timestamp is set (used for backfills / replays), skip - // processing messages whose timestamp exceeds the cutoff by returning - // an empty batch that still commits the offset. - if let Some(stop) = stop_at_timestamp { - if stop < timestamp.timestamp() { - let payload = BytesInsertBatch::from_rows(Vec::new()); - return Ok(Message::new_broker_message( - payload, partition, offset, timestamp, - )); - } - } - - let num_bytes: usize = transformed.rows.iter().map(|r| r.estimated_size()).sum(); - let mut payload = BytesInsertBatch::from_rows(transformed.rows) - .with_num_bytes(num_bytes) - .with_message_timestamp(timestamp) - .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( - partition.index, - CommitLogEntry { - offset: offset + 1, - orig_message_ts: timestamp, - received_p99: transformed.origin_timestamp.into_iter().collect(), - }, - )]))) - .with_cogs_data(transformed.cogs_data.unwrap_or_default()); - - if let Some(ts) = transformed.origin_timestamp { - payload = payload.with_origin_timestamp(ts); - } - if let Some(ts) = transformed.sentry_received_timestamp { - payload = payload.with_sentry_received_timestamp(ts); - } - if let Some(metrics) = transformed.item_type_metrics { - payload = payload.with_item_type_metrics(metrics); - } - - Ok(Message::new_broker_message( - payload, partition, offset, timestamp, - )) - } - - let task_runner = MessageProcessor { - schema, - enforce_schema, - func, - result_to_next_msg: result_to_next_msg::, - processor_config, - stop_at_timestamp, - }; - - Box::new(RunTaskInThreads::new( - next_step, - task_runner, - concurrency, - Some("process_message"), - )) -} - pub fn get_schema(schema_name: &str, enforce_schema: bool) -> Option> { match sentry_kafka_schemas::get_schema(schema_name, None) { Ok(s) => Some(Arc::new(s)), diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 38c4893f44..b84b0e5fe7 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -9,12 +9,6 @@ use sentry_arroyo::timer; use sentry_protos::snuba::v1::TraceItemType; use serde::{Deserialize, Serialize}; -/// Trait for row types that can estimate their in-memory byte size. -/// Used by byte-based batch size calculation in the Reduce step. -pub trait EstimatedSize { - fn estimated_size(&self) -> usize; -} - #[derive(Clone, Debug, PartialEq)] pub struct CommitLogEntry { pub offset: u64, @@ -226,6 +220,27 @@ impl InsertBatch { }) } + /// Constructor for processors that have already serialized their rows into + /// the wire format (e.g. the RowBinary path encodes each `EAPItemRow` + /// inside the processor instead of carrying the typed struct downstream). + /// `num_rows` is the count those bytes represent. + pub fn from_encoded_rows( + encoded_rows: Vec, + num_rows: usize, + origin_timestamp: Option>, + ) -> Self { + Self { + rows: RowData { + encoded_rows, + num_rows, + }, + origin_timestamp, + sentry_received_timestamp: None, + cogs_data: None, + item_type_metrics: None, + } + } + /// In case the processing function wants to skip the message, we return an empty batch. /// But instead of having the caller send an empty batch, lets make an explicit api for /// skipping. This way we can change the implementation later if we want to. Skipping ensures @@ -456,48 +471,6 @@ impl BytesInsertBatch { } } -impl BytesInsertBatch> { - pub fn len(&self) -> usize { - self.rows.len() - } - - pub fn merge(mut self, other: Self) -> Self { - self.rows.extend(other.rows); - self.num_bytes += other.num_bytes; - self.commit_log_offsets.merge(other.commit_log_offsets); - self.message_timestamp.merge(other.message_timestamp); - self.origin_timestamp.merge(other.origin_timestamp); - self.sentry_received_timestamp - .merge(other.sentry_received_timestamp); - self.cogs_data.merge(other.cogs_data); - self.item_type_metrics.merge(other.item_type_metrics); - self - } -} - -/// The return value of message processors that produce typed rows for RowBinary insertion. -/// A single Kafka message may produce multiple rows, hence Vec. -#[derive(Clone, Debug)] -pub struct TypedInsertBatch { - pub rows: Vec, - pub origin_timestamp: Option>, - pub sentry_received_timestamp: Option>, - pub cogs_data: Option, - pub item_type_metrics: Option, -} - -impl TypedInsertBatch { - pub fn from_rows(rows: Vec, origin_timestamp: Option>) -> Self { - Self { - rows, - origin_timestamp, - sentry_received_timestamp: None, - cogs_data: None, - item_type_metrics: None, - } - } -} - #[derive(Clone, Debug, Deserialize, Serialize, Default, PartialEq)] pub struct RowData { pub encoded_rows: Vec,