From 5c262e103379bfc03850f6af8e9c05fb9ec66d35 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 29 May 2026 12:17:30 -0700 Subject: [PATCH 1/7] perf(eap-items): Encode RowBinary inserts in the processor The RowBinary consumer kept Vec alive in every batch all the way from the processor to the writer. Each row has ~80 attribute-bucket Vecs plus per-bucket String allocations, so consumers ran with 2-3x the in-flight memory of the JSONEachRow path. The clickhouse-rs Insert builder also double-buffered the encoded bytes during writes. Serialize each EAPItemRow to RowBinary bytes inside the processor and carry only the resulting RowData> downstream. The typed struct drops as soon as the bytes exist; the writer just POSTs them with INSERT INTO ... FORMAT RowBinary, mirroring the JSON path. Vendor a small (~280 line) RowBinary serde Serializer plus the UUID adapter from clickhouse-rs, since the upstream serializer is pub(crate). With that in place, drop the clickhouse-rs dependency entirely - the integration test now talks to ClickHouse via reqwest + FORMAT JSON. The RowBinary and JSONEachRow pipelines now share the same Reduce -> Writer shape; use_row_binary just selects the processor function and the FORMAT clause. TypedInsertBatch, EstimatedSize, the BytesInsertBatch> impl, make_rust_processor_row_binary, and ClickhouseRowBinaryWriterStep are all gone. Co-Authored-By: Claude Opus 4.7 (1M context) Agent transcript: https://claudescope.sentry.dev/share/39S080clqOozGCknLHK2EbqiXwoO9gjlPnmgL_y0M0Q --- rust_snuba/Cargo.lock | 96 +--- rust_snuba/Cargo.toml | 1 - rust_snuba/src/factory_v2.rs | 183 +------- rust_snuba/src/lib.rs | 2 +- rust_snuba/src/processors/eap_items.rs | 249 +++++++---- rust_snuba/src/strategies/clickhouse/mod.rs | 2 +- .../clickhouse/row_binary_writer.rs | 211 --------- .../strategies/clickhouse/rowbinary/mod.rs | 36 ++ .../strategies/clickhouse/rowbinary/ser.rs | 418 ++++++++++++++++++ .../src/strategies/clickhouse/writer_v2.rs | 66 ++- rust_snuba/src/strategies/processor.rs | 84 +--- rust_snuba/src/types.rs | 48 -- 12 files changed, 692 insertions(+), 704 deletions(-) delete mode 100644 rust_snuba/src/strategies/clickhouse/row_binary_writer.rs create mode 100644 rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs create mode 100644 rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs diff --git a/rust_snuba/Cargo.lock b/rust_snuba/Cargo.lock index 1bdbacce268..4ce1d21e813 100644 --- a/rust_snuba/Cargo.lock +++ b/rust_snuba/Cargo.lock @@ -556,15 +556,6 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc0b364ead1874514c8c2855ab558056ebfeb775653e7ae45ff72f28f8f3166c" -[[package]] -name = "bstr" -version = "1.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" -dependencies = [ - "memchr", -] - [[package]] name = "bumpalo" version = "3.16.0" @@ -664,12 +655,6 @@ dependencies = [ "half", ] -[[package]] -name = "cityhash-rs" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d" - [[package]] name = "clap" version = "4.5.4" @@ -710,44 +695,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" -[[package]] -name = "clickhouse" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a9a81a1dffadd762ee662635ce409232258ce9beebd7cc0fa227df0b5e7efc0" -dependencies = [ - "bstr", - "bytes", - "cityhash-rs", - "clickhouse-derive", - "futures", - "futures-channel", - "http-body-util", - "hyper 1.6.0", - "hyper-util", - "lz4_flex", - "replace_with", - "sealed", - "serde", - "static_assertions", - "thiserror 1.0.58", - "tokio", - "url", - "uuid", -] - -[[package]] -name = "clickhouse-derive" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d70f3e2893f7d3e017eeacdc9a708fbc29a10488e3ebca21f9df6a5d2b616dbb" -dependencies = [ - "proc-macro2", - "quote", - "serde_derive_internals 0.29.1", - "syn 2.0.117", -] - [[package]] name = "cmake" version = "0.1.57" @@ -2400,12 +2347,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "lz4_flex" -version = "0.11.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a" - [[package]] name = "matchers" version = "0.2.0" @@ -3354,12 +3295,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "replace_with" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884" - [[package]] name = "reqwest" version = "0.11.27" @@ -3449,7 +3384,6 @@ dependencies = [ "bytes", "cadence", "chrono", - "clickhouse", "criterion", "ctrlc", "data-encoding", @@ -3631,7 +3565,7 @@ checksum = "c767fd6fa65d9ccf9cf026122c1b555f2ef9a4f0cea69da4d7dbc3e258d30967" dependencies = [ "proc-macro2", "quote", - "serde_derive_internals 0.26.0", + "serde_derive_internals", "syn 1.0.109", ] @@ -3641,17 +3575,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sealed" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "security-framework" version = "2.10.0" @@ -3983,17 +3906,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "serde_derive_internals" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - [[package]] name = "serde_json" version = "1.0.149" @@ -4219,12 +4131,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "statsdproxy" version = "0.4.1" diff --git a/rust_snuba/Cargo.toml b/rust_snuba/Cargo.toml index 70e647016f5..62518f4f3de 100644 --- a/rust_snuba/Cargo.toml +++ b/rust_snuba/Cargo.toml @@ -26,7 +26,6 @@ adler = "1.0.2" anyhow = { version = "1.0.69", features = ["backtrace"] } cadence = "1.0.0" chrono = { version = "0.4.26", features = ["serde"] } -clickhouse = { version = "0.13", features = ["uuid"] } ctrlc = { version = "3.2.5", features = ["termination"] } data-encoding = "2.5.0" futures = "0.3.21" diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index 457924e37fc..f1b39834af8 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -26,18 +26,16 @@ use crate::processors::{self, get_cogs_label}; use crate::strategies::accountant::RecordCogs; use crate::strategies::blq_router::BLQRouter; -use crate::strategies::clickhouse::row_binary_writer::ClickhouseRowBinaryWriterStep; -use crate::strategies::clickhouse::writer_v2::ClickhouseWriterStep; +use crate::strategies::clickhouse::writer_v2::{ClickhouseWriterStep, InsertFormat}; use crate::strategies::commit_log::ProduceCommitLog; use crate::strategies::healthcheck::HealthCheck as SnubaHealthCheck; use crate::strategies::join_timeout::SetJoinTimeout; use crate::strategies::processor::{ - get_schema, make_rust_processor, make_rust_processor_row_binary, - make_rust_processor_with_replacements, validate_schema, + get_schema, make_rust_processor, make_rust_processor_with_replacements, validate_schema, }; use crate::strategies::python::PythonTransformStep; use crate::strategies::replacements::ProduceReplacements; -use crate::types::{BytesInsertBatch, CogsData, RowData, TypedInsertBatch}; +use crate::types::{BytesInsertBatch, CogsData, RowData}; pub struct ConsumerStrategyFactoryV2 { pub storage_config: config::StorageConfig, @@ -90,22 +88,25 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } fn create(&self) -> Box> { - if self.use_row_binary { - tracing::info!("Using row_binary pipeline"); - return match self + // Both JSON and RowBinary now flow through the same pipeline. The + // processor function is the only thing that varies between them: it + // encodes each row to the right wire format up front and the + // pipeline carries the resulting bytes the rest of the way. + let (insert_format, process_fn_override) = if self.use_row_binary { + tracing::info!("Using RowBinary wire format"); + let processor_name = self .storage_config .message_processor .python_class_name - .as_str() - { - "EAPItemsProcessor" => self.create_row_binary_pipeline( - crate::processors::eap_items::process_message_row_binary, - ), + .as_str(); + let func: crate::processors::ProcessingFunction = match processor_name { + "EAPItemsProcessor" => crate::processors::eap_items::process_message_row_binary, name => panic!("RowBinary not supported for processor: {name}"), }; - } - - // ---- Existing JSONEachRow path (unchanged below this line) ---- + (InsertFormat::RowBinary, Some(func)) + } else { + (InsertFormat::JsonEachRow, None) + }; // Commit offsets let next_step = CommitOffsets::new(Duration::from_secs(1)); @@ -151,6 +152,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { false, &self.clickhouse_concurrency, self.storage_config.name.clone(), + insert_format, ); let accumulator = Arc::new( @@ -223,6 +225,10 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { ) } (true, Some(processors::ProcessingFunctionType::ProcessingFunction(func))) => { + // For storages opted into RowBinary, swap the registered JSON + // processor for its RowBinary sibling. Same signature, same + // pipeline; only the encoding inside the processor differs. + let func = process_fn_override.unwrap_or(func); make_rust_processor( next_step, func, @@ -308,151 +314,6 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { } } -impl ConsumerStrategyFactoryV2 { - fn create_row_binary_pipeline< - T: clickhouse::Row - + serde::Serialize - + Clone - + Send - + Sync - + crate::types::EstimatedSize - + 'static, - >( - &self, - func: fn( - KafkaPayload, - crate::types::KafkaMessageMetadata, - &config::ProcessorConfig, - ) -> anyhow::Result>, - ) -> Box> { - // Commit offsets - let next_step = CommitOffsets::new(Duration::from_secs(1)); - - let next_step: Box>> = - if let Some((ref producer, destination)) = self.commit_log_producer { - Box::new(ProduceCommitLog::new( - next_step, - producer.clone(), - destination, - self.physical_topic_name, - self.physical_consumer_group.clone(), - &self.commitlog_concurrency, - false, - )) - } else { - Box::new(next_step) - }; - - let cogs_label = get_cogs_label(&self.storage_config.message_processor.python_class_name); - - let next_step: Box>> = - match (self.env_config.record_cogs, cogs_label) { - (true, Some(resource_id)) => Box::new(RecordCogs::new( - next_step, - resource_id, - self.accountant_topic_config.broker_config.clone(), - &self.accountant_topic_config.physical_topic_name, - )), - _ => Box::new(next_step), - }; - - let next_step = SetJoinTimeout::new( - next_step, - Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), - ); - - let next_step = ClickhouseRowBinaryWriterStep::::new( - next_step, - self.storage_config.clickhouse_cluster.clone(), - self.storage_config.clickhouse_table_name.clone(), - &self.clickhouse_concurrency, - self.storage_config.name.clone(), - ); - - let accumulator = Arc::new( - |batch: BytesInsertBatch>, small_batch: Message>>| { - Ok(batch.merge(small_batch.into_payload())) - }, - ); - - type BatchSizeFn = fn(&BytesInsertBatch>) -> usize; - let compute_batch_size: BatchSizeFn = match self.max_batch_size_calculation { - config::BatchSizeCalculation::Bytes => |batch| batch.num_bytes(), - config::BatchSizeCalculation::Rows => |batch| batch.len(), - }; - - let next_step = Reduce::new( - next_step, - accumulator, - Arc::new(move || { - BytesInsertBatch::>::new( - Vec::new(), - None, - None, - None, - Default::default(), - CogsData::default(), - ) - }), - self.max_batch_size, - self.max_batch_time, - compute_batch_size, - ) - .flush_empty_batches(true); - - let next_step = make_rust_processor_row_binary( - next_step, - func, - &self.logical_topic_name, - self.enforce_schema, - &self.processing_concurrency, - config::ProcessorConfig { - env_config: self.env_config.clone(), - storage_name: self.storage_config.name.clone(), - }, - self.stop_at_timestamp, - ); - - let next_step = SetJoinTimeout::new( - next_step, - Some(Duration::from_millis(self.join_timeout_ms.unwrap_or(0))), - ); - - let next_step: Box> = - if let (Some(blq_producer_config), Some(blq_topic)) = - (&self.blq_producer_config, self.blq_topic) - { - tracing::info!( - "Routing stale messages to the backlog-queue topic {:?} \ - (thresholds configured via sentry-options)", - self.blq_topic, - ); - Box::new(BLQRouter::new( - next_step, - blq_producer_config.clone(), - blq_topic, - )) - } else { - tracing::info!("Not using a backlog-queue",); - Box::new(next_step) - }; - - if let Some(path) = &self.health_check_file { - if self.health_check == "snuba" { - tracing::info!( - "Using Snuba HealthCheck for consumer group: {}", - self.physical_consumer_group - ); - Box::new(SnubaHealthCheck::new(next_step, path)) - } else { - Box::new(HealthCheck::new(next_step, path)) - } - } else { - Box::new(next_step) - } - } -} - #[derive(Clone)] struct SchemaValidator { schema: Option>, diff --git a/rust_snuba/src/lib.rs b/rust_snuba/src/lib.rs index 3dc72ff7500..7fd2bedde6f 100644 --- a/rust_snuba/src/lib.rs +++ b/rust_snuba/src/lib.rs @@ -39,7 +39,7 @@ pub use metrics::statsd::StatsDBackend; pub use processors::{ProcessingFunction, ProcessingFunctionType, PROCESSORS}; pub use strategies::noop::Noop; pub use strategies::python::PythonTransformStep; -pub use types::{EstimatedSize, KafkaMessageMetadata}; +pub use types::KafkaMessageMetadata; #[cfg(test)] mod testutils; diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index e2f5f09aa45..5a3651b6723 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -19,10 +19,9 @@ use crate::processors::utils::{ record_invalid_timestamp_metric, }; use crate::runtime_config::get_str_config; +use crate::strategies::clickhouse::rowbinary; use crate::types::CogsData; -use crate::types::{ - item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata, TypedInsertBatch, -}; +use crate::types::{item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata, RowData}; /// Runtime config key prefix. Per-storage key /// `eap_items_dlq_grace_period_min:`: a non-negative integer @@ -202,18 +201,62 @@ pub fn process_message_row_binary( msg: KafkaPayload, _metadata: KafkaMessageMetadata, config: &ProcessorConfig, -) -> anyhow::Result> { +) -> anyhow::Result { let processed = process_eap_item(msg, config)?; if processed.should_skip { - return Ok(TypedInsertBatch::from_rows(vec![], None)); + return Ok(InsertBatch::skip()); } - let mut batch = TypedInsertBatch::from_rows( - vec![EAPItemRow::try_from(processed.eap_item)?], - processed.origin_timestamp, - ); - batch.item_type_metrics = Some(processed.item_type_metrics); - batch.cogs_data = Some(processed.cogs_data); - Ok(batch) + let row = EAPItemRow::try_from(processed.eap_item)?; + + // Encode the row to RowBinary bytes inline so the wide typed struct (~80 + // Vec<(String, _)> buckets) drops here instead of riding the pipeline to + // the writer step. The batch downstream sees only a compact Vec. + let mut encoded_rows = Vec::new(); + rowbinary::serialize_into(&mut encoded_rows, &row)?; + + Ok(InsertBatch { + rows: RowData { + encoded_rows, + num_rows: 1, + }, + origin_timestamp: processed.origin_timestamp, + sentry_received_timestamp: None, + cogs_data: Some(processed.cogs_data), + item_type_metrics: Some(processed.item_type_metrics), + }) +} + +/// Test-only: returns the typed `EAPItemRow` (plus the metadata fields the +/// pipeline carries) without the bytes serialization step. Tests that +/// inspect individual columns use this; the production path goes through +/// `process_message_row_binary` and never holds the typed struct beyond +/// `process_eap_item`. +#[cfg(test)] +pub(crate) struct EAPItemRowBatch { + pub rows: Vec, + pub cogs_data: Option, + pub item_type_metrics: Option, +} + +#[cfg(test)] +pub(crate) fn process_message_row_binary_typed( + msg: KafkaPayload, + _metadata: KafkaMessageMetadata, + config: &ProcessorConfig, +) -> anyhow::Result { + let processed = process_eap_item(msg, config)?; + if processed.should_skip { + return Ok(EAPItemRowBatch { + rows: vec![], + cogs_data: None, + item_type_metrics: None, + }); + } + Ok(EAPItemRowBatch { + rows: vec![EAPItemRow::try_from(processed.eap_item)?], + cogs_data: Some(processed.cogs_data), + item_type_metrics: Some(processed.item_type_metrics), + }) } #[derive(Debug, Default, Serialize)] @@ -421,13 +464,13 @@ impl AttributeMap { } seq_attrs! { -#[derive(Debug, Clone, clickhouse::Row, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct EAPItemRow { organization_id: u64, project_id: u64, item_type: u8, timestamp: u32, - #[serde(with = "clickhouse::serde::uuid")] + #[serde(with = "crate::strategies::clickhouse::rowbinary::uuid")] trace_id: Uuid, item_id: u128, @@ -451,31 +494,6 @@ pub struct EAPItemRow { } } -fn vec_string_pair_size(v: &[(String, B)]) -> usize { - let heap: usize = v.iter().map(|(s, _)| s.len()).sum(); - std::mem::size_of_val(v) + heap -} - -fn vec_string_string_pair_size(v: &[(String, String)]) -> usize { - let heap: usize = v.iter().map(|(k, v)| k.len() + v.len()).sum(); - std::mem::size_of_val(v) + heap -} - -seq_attrs! { -impl crate::types::EstimatedSize for EAPItemRow { - fn estimated_size(&self) -> usize { - std::mem::size_of::() - + vec_string_pair_size(&self.attributes_bool) - + vec_string_pair_size(&self.attributes_int) - + self.attributes_array.len() - #( - + vec_string_string_pair_size(&self.attributes_string_~N) - + vec_string_pair_size(&self.attributes_float_~N) - )* - } -} -} - impl TryFrom for EAPItemRow { type Error = anyhow::Error; @@ -794,7 +812,7 @@ mod tests { ); // RowBinary path - let rb_batch = process_message_row_binary( + let rb_batch = process_message_row_binary_typed( KafkaPayload::new(None, None, Some(payload_bytes)), meta, &ProcessorConfig::default(), @@ -844,7 +862,7 @@ mod tests { .contains_key("sentry._internal.received_at")); // RowBinary path - let rb_batch = process_message_row_binary( + let rb_batch = process_message_row_binary_typed( KafkaPayload::new(None, None, Some(payload_bytes)), meta, &ProcessorConfig::default(), @@ -963,7 +981,7 @@ mod tests { offset: 1, timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); assert_eq!(batch.rows.len(), 1); @@ -991,7 +1009,7 @@ mod tests { offset: 1, timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1013,7 +1031,7 @@ mod tests { offset: 1, timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1037,7 +1055,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); assert!(batch.item_type_metrics.is_some()); @@ -1066,7 +1084,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); assert!(batch.cogs_data.is_some()); @@ -1112,7 +1130,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1151,7 +1169,7 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; - let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) + let batch = process_message_row_binary_typed(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); let row = &batch.rows[0]; @@ -1229,8 +1247,9 @@ mod tests { // Process through RowBinary path let rb_payload = KafkaPayload::new(None, None, Some(payload_bytes)); - let rb_batch = process_message_row_binary(rb_payload, meta, &ProcessorConfig::default()) - .expect("RowBinary path should succeed"); + let rb_batch = + process_message_row_binary_typed(rb_payload, meta, &ProcessorConfig::default()) + .expect("RowBinary path should succeed"); // Parse the JSON output let json_str = @@ -1435,6 +1454,12 @@ mod tests { ); } + /// End-to-end test of the production RowBinary path against a live + /// ClickHouse instance: process_message_row_binary produces bytes via the + /// vendored serializer, those bytes are POSTed verbatim with + /// `FORMAT RowBinary`, and the row is read back via `FORMAT JSON`. This + /// is the cross-boundary check that our wire format matches what + /// ClickHouse expects — pure unit tests can't catch that. #[tokio::test] async fn test_row_binary_clickhouse_insert() { let host = std::env::var("CLICKHOUSE_HOST").unwrap_or("127.0.0.1".to_string()); @@ -1443,12 +1468,9 @@ mod tests { .parse() .unwrap(); let database = std::env::var("CLICKHOUSE_DATABASE").unwrap_or("default".to_string()); + let base_url = format!("http://{host}:{http_port}"); - let client = clickhouse::Client::default() - .with_url(format!("http://{host}:{http_port}")) - .with_database(&database) - .with_option("input_format_binary_read_json_as_string", "1") - .with_option("insert_deduplicate", "0"); + let http = reqwest::Client::new(); // Use a unique organization_id to avoid conflicts with other test data let unique_org_id: u64 = 999_999_000 + (rand::random::() % 1000) as u64; @@ -1495,56 +1517,91 @@ mod tests { timestamp: DateTime::from(SystemTime::now()), }; + // Production path: encodes the row to RowBinary bytes inside the processor. let batch = process_message_row_binary(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); - - assert_eq!(batch.rows.len(), 1); - let row = batch.rows[0].clone(); - - // Insert via RowBinary (same code path as production) - let mut insert = client - .insert("eap_items_1_local") - .expect("Failed to create insert"); - insert.write(&row).await.expect("Failed to write row"); - insert.end().await.expect("Failed to end insert"); - - // Read it back using organization_id (primary key prefix) for reliable lookup - let count: u64 = client - .query(&format!( - "SELECT count() FROM eap_items_1_local WHERE organization_id = {unique_org_id}" - )) - .fetch_one() + assert_eq!(batch.rows.num_rows, 1); + + // Insert: POST the pre-encoded bytes with FORMAT RowBinary. + let insert_resp = http + .post(&base_url) + .header("X-ClickHouse-Database", &database) + .query(&[ + ("query", "INSERT INTO eap_items_1_local FORMAT RowBinary"), + ("input_format_binary_read_json_as_string", "1"), + ("insert_deduplicate", "0"), + ]) + .body(batch.rows.encoded_rows.clone()) + .send() .await - .expect("Failed to count rows"); + .expect("Insert request failed to send"); assert!( - count > 0, - "No rows found after insert for org_id={unique_org_id}" + insert_resp.status().is_success(), + "Insert failed: {}", + insert_resp.text().await.unwrap_or_default() ); - let result = client - .query(&format!( - "SELECT organization_id, project_id, item_type, sampling_weight \ - FROM eap_items_1_local \ - WHERE organization_id = {unique_org_id} \ - LIMIT 1" - )) - .fetch_one::<(u64, u64, u8, u64)>() + // Read it back via FORMAT JSON. We use organization_id (primary key prefix) + // for a deterministic lookup. ClickHouse's FORMAT JSON renders 64-bit + // ints as strings to avoid JS-precision loss, so we parse them back. + let select_resp = http + .post(&base_url) + .header("X-ClickHouse-Database", &database) + .query(&[( + "query", + format!( + "SELECT organization_id, project_id, item_type, sampling_weight \ + FROM eap_items_1_local \ + WHERE organization_id = {unique_org_id} \ + LIMIT 1 FORMAT JSON" + ), + )]) + .send() .await - .expect("Failed to read back inserted row"); - - assert_eq!(result.0, unique_org_id); // organization_id - assert_eq!(result.1, 1); // project_id - assert_eq!(result.2, TraceItemType::Span as u8); // item_type - assert_eq!(result.3, 1); // sampling_weight + .expect("Select request failed to send"); + assert!(select_resp.status().is_success(), "Select failed"); + let body_text = select_resp.text().await.expect("response body"); + let body: serde_json::Value = serde_json::from_str(&body_text).expect("JSON response"); + let data = body["data"].as_array().expect("data array"); + assert_eq!(data.len(), 1, "no rows found for org_id={unique_org_id}"); + let row = &data[0]; + assert_eq!( + row["organization_id"] + .as_str() + .unwrap() + .parse::() + .unwrap(), + unique_org_id + ); + assert_eq!( + row["project_id"].as_str().unwrap().parse::().unwrap(), + 1 + ); + assert_eq!( + row["item_type"].as_u64().unwrap() as u8, + TraceItemType::Span as u8 + ); + assert_eq!( + row["sampling_weight"] + .as_str() + .unwrap() + .parse::() + .unwrap(), + 1 + ); // Clean up - client - .query(&format!( - "ALTER TABLE eap_items_1_local DELETE WHERE organization_id = {unique_org_id}" - )) - .execute() - .await - .ok(); + let _ = http + .post(&base_url) + .header("X-ClickHouse-Database", &database) + .query(&[( + "query", + format!( + "ALTER TABLE eap_items_1_local DELETE WHERE organization_id = {unique_org_id}" + ), + )]) + .send() + .await; } #[test] diff --git a/rust_snuba/src/strategies/clickhouse/mod.rs b/rust_snuba/src/strategies/clickhouse/mod.rs index 29f0fd60ceb..45a3d852766 100644 --- a/rust_snuba/src/strategies/clickhouse/mod.rs +++ b/rust_snuba/src/strategies/clickhouse/mod.rs @@ -1,2 +1,2 @@ -pub mod row_binary_writer; +pub mod rowbinary; pub mod writer_v2; diff --git a/rust_snuba/src/strategies/clickhouse/row_binary_writer.rs b/rust_snuba/src/strategies/clickhouse/row_binary_writer.rs deleted file mode 100644 index 35c00b32c39..00000000000 --- a/rust_snuba/src/strategies/clickhouse/row_binary_writer.rs +++ /dev/null @@ -1,211 +0,0 @@ -use std::time::{Duration, SystemTime}; - -use sentry_arroyo::processing::strategies::run_task_in_threads::{ - ConcurrencyConfig, RunTaskError, RunTaskFunc, RunTaskInThreads, TaskRunner, -}; -use sentry_arroyo::processing::strategies::{ - CommitRequest, ProcessingStrategy, StrategyError, SubmitError, -}; -use sentry_arroyo::types::Message; -use sentry_arroyo::{counter, timer}; - -use crate::config::ClickhouseConfig; -use crate::runtime_config::get_load_balancing_config; -use crate::types::BytesInsertBatch; - -struct RowBinaryTaskRunner { - client: clickhouse::Client, - storage_name: String, - table: String, - _phantom: std::marker::PhantomData, -} - -impl Clone for RowBinaryTaskRunner { - fn clone(&self) -> Self { - Self { - client: self.client.clone(), - storage_name: self.storage_name.clone(), - table: self.table.clone(), - _phantom: std::marker::PhantomData, - } - } -} - -impl RowBinaryTaskRunner -where - // Send + Sync + 'static required because T is used in async tasks dispatched - // across threads via RunTaskInThreads. - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, -{ - fn new(config: &ClickhouseConfig, table: String, storage_name: String) -> Self { - let scheme = if config.secure { "https" } else { "http" }; - let client = clickhouse::Client::default() - .with_url(format!("{}://{}:{}", scheme, config.host, config.http_port)) - .with_user(&config.user) - .with_password(&config.password) - .with_database(&config.database) - // Wait for data to be written to all shards before returning success. - // Without this, inserts return immediately after writing to the local - // node and data is forwarded asynchronously, risking data loss on failure. - .with_option("insert_distributed_sync", "1") - .with_option("input_format_binary_read_json_as_string", "1"); - - Self { - client, - storage_name, - table, - _phantom: std::marker::PhantomData, - } - } -} - -const INITIAL_BACKOFF_MS: f64 = 500.0; -const MAX_RETRIES: usize = 4; -const JITTER_FACTOR: f64 = 0.2; - -impl TaskRunner>, BytesInsertBatch<()>, anyhow::Error> - for RowBinaryTaskRunner -where - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, -{ - fn get_task( - &self, - message: Message>>, - ) -> RunTaskFunc, anyhow::Error> { - let client = self.client.clone(); - let table = self.table.clone(); - let lb_config = get_load_balancing_config(&self.storage_name); - - Box::pin(async move { - let (empty_message, insert_batch) = message.take(); - let batch_len = insert_batch.len(); - let num_bytes = insert_batch.num_bytes(); - let (rows, empty_batch) = insert_batch.take(); - - let write_start = SystemTime::now(); - - if rows.is_empty() { - tracing::debug!("skipping write of empty payload ({} rows)", batch_len); - } else { - tracing::debug!("performing row binary write"); - - for attempt in 0..=MAX_RETRIES { - match write_rows(&client, &table, &rows, &lb_config).await { - Ok(()) => break, - Err(e) => { - if attempt == MAX_RETRIES { - counter!("rust_consumer.clickhouse_insert_error", 1, "status" => "row_binary_error", "retried" => "false"); - return Err(RunTaskError::Other(anyhow::anyhow!( - "error writing to clickhouse via RowBinary after {} attempts: {}", - MAX_RETRIES + 1, - e - ))); - } - counter!("rust_consumer.clickhouse_insert_error", 1, "status" => "row_binary_error", "retried" => "true"); - tracing::warn!( - "ClickHouse RowBinary write failed (attempt {}/{}): {}", - attempt + 1, - MAX_RETRIES + 1, - e - ); - - let backoff_ms = - INITIAL_BACKOFF_MS * (2_u64.pow(attempt as u32) as f64); - let jitter = - rand::random::() * JITTER_FACTOR - JITTER_FACTOR / 2.0; - let delay = - Duration::from_millis((backoff_ms * (1.0 + jitter)).round() as u64); - tokio::time::sleep(delay).await; - } - } - } - - tracing::info!("Inserted {} rows via RowBinary", batch_len); - let write_finish = SystemTime::now(); - if let Ok(elapsed) = write_finish.duration_since(write_start) { - timer!("insertions.batch_write_ms", elapsed); - } - } - - counter!("insertions.batch_write_bytes", num_bytes as i64); - counter!("insertions.batch_write_msgs", batch_len as i64); - empty_batch.record_message_latency(); - empty_batch.emit_item_type_metrics(); - - Ok(empty_message.replace(empty_batch)) - }) - } -} - -async fn write_rows( - client: &clickhouse::Client, - table: &str, - rows: &[T], - lb_config: &crate::runtime_config::LoadBalancingConfig, -) -> Result<(), clickhouse::error::Error> { - let mut insert = client - .insert(table)? - .with_option("load_balancing", &lb_config.load_balancing); - if let Some(offset) = &lb_config.first_offset { - insert = insert.with_option("load_balancing_first_offset", offset); - } - for row in rows { - insert.write(row).await?; - } - insert.end().await?; - Ok(()) -} - -pub struct ClickhouseRowBinaryWriterStep { - inner: RunTaskInThreads>, BytesInsertBatch<()>, anyhow::Error, N>, -} - -impl ClickhouseRowBinaryWriterStep -where - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, - N: ProcessingStrategy> + 'static, -{ - pub fn new( - next_step: N, - cluster_config: ClickhouseConfig, - table: String, - concurrency: &ConcurrencyConfig, - storage_name: String, - ) -> Self { - let task_runner = RowBinaryTaskRunner::::new(&cluster_config, table, storage_name); - - let inner = RunTaskInThreads::new( - next_step, - task_runner, - concurrency, - Some("clickhouse_row_binary"), - ); - - ClickhouseRowBinaryWriterStep { inner } - } -} - -impl ProcessingStrategy>> for ClickhouseRowBinaryWriterStep -where - T: clickhouse::Row + serde::Serialize + Send + Sync + 'static, - N: ProcessingStrategy>, -{ - fn poll(&mut self) -> Result, StrategyError> { - self.inner.poll() - } - - fn submit( - &mut self, - message: Message>>, - ) -> Result<(), SubmitError>>> { - self.inner.submit(message) - } - - fn terminate(&mut self) { - self.inner.terminate(); - } - - fn join(&mut self, timeout: Option) -> Result, StrategyError> { - self.inner.join(timeout) - } -} diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs new file mode 100644 index 00000000000..880bf3aafda --- /dev/null +++ b/rust_snuba/src/strategies/clickhouse/rowbinary/mod.rs @@ -0,0 +1,36 @@ +//! Minimal RowBinary serializer for inserts into ClickHouse. +//! +//! Vendored from the `clickhouse` crate (Apache-2.0 / MIT) and reduced to the +//! shape `rust_snuba` actually emits: fixed-width integers, f32/f64, bool, str, +//! raw byte slices (UUID / FixedString), length-prefixed sequences, tuples and +//! structs. We do this in-process so the typed rows can be serialized to bytes +//! inside the processor and dropped immediately, instead of riding the pipeline +//! all the way to the writer step. + +mod ser; + +// `Error` is part of the module's public surface (it's the error type of +// `serialize_into`). It isn't referenced by name anywhere in the crate today +// — callers propagate via `?` into `anyhow::Error` — but keep the re-export +// so consumers can match on the variant if they ever need to. +#[allow(unused_imports)] +pub use ser::{serialize_into, Error}; + +pub mod uuid { + //! Serde adapter for `#[serde(with = "...")]` on `Uuid` fields. ClickHouse + //! stores UUID as two little-endian u64 halves, so the canonical 16-byte + //! UUID has each half reversed before being written. + + use serde::Serializer; + use uuid::Uuid; + + pub fn serialize(uuid: &Uuid, serializer: S) -> Result + where + S: Serializer, + { + let mut bytes = *uuid.as_bytes(); + bytes[..8].reverse(); + bytes[8..].reverse(); + serializer.serialize_bytes(&bytes) + } +} diff --git a/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs b/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs new file mode 100644 index 00000000000..e17fab74d7b --- /dev/null +++ b/rust_snuba/src/strategies/clickhouse/rowbinary/ser.rs @@ -0,0 +1,418 @@ +use std::fmt; + +use serde::ser::{ + self, Impossible, Serialize, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple, + SerializeTupleStruct, +}; + +/// Append the RowBinary encoding of `value` to `buf`. +pub fn serialize_into(buf: &mut Vec, value: &T) -> Result<(), Error> +where + T: Serialize + ?Sized, +{ + let mut serializer = Serializer { buf }; + value.serialize(&mut serializer) +} + +#[derive(Debug)] +pub enum Error { + Message(String), + /// Sequences and maps must report a known length so we can emit the + /// LEB128 length prefix up front. + SequenceLengthRequired, + Unsupported(&'static str), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Message(s) => f.write_str(s), + Error::SequenceLengthRequired => { + f.write_str("rowbinary: sequences and maps must have a known length") + } + Error::Unsupported(what) => write!(f, "rowbinary: unsupported serde type: {what}"), + } + } +} + +impl std::error::Error for Error {} + +impl ser::Error for Error { + fn custom(msg: T) -> Self { + Error::Message(msg.to_string()) + } +} + +struct Serializer<'a> { + buf: &'a mut Vec, +} + +fn write_uvarint(buf: &mut Vec, mut v: u64) { + while v >= 0x80 { + buf.push((v as u8) | 0x80); + v >>= 7; + } + buf.push(v as u8); +} + +macro_rules! impl_le { + ($($name:ident => $ty:ty),+ $(,)?) => {$( + #[inline] + fn $name(self, v: $ty) -> Result<(), Error> { + self.buf.extend_from_slice(&v.to_le_bytes()); + Ok(()) + } + )+}; +} + +impl<'a, 'b> ser::Serializer for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + + type SerializeSeq = Self; + type SerializeTuple = Self; + type SerializeTupleStruct = Self; + type SerializeMap = Self; + type SerializeStruct = Self; + type SerializeTupleVariant = Impossible<(), Error>; + type SerializeStructVariant = Impossible<(), Error>; + + #[inline] + fn serialize_bool(self, v: bool) -> Result<(), Error> { + self.buf.push(v as u8); + Ok(()) + } + + #[inline] + fn serialize_u8(self, v: u8) -> Result<(), Error> { + self.buf.push(v); + Ok(()) + } + + #[inline] + fn serialize_i8(self, v: i8) -> Result<(), Error> { + self.buf.push(v as u8); + Ok(()) + } + + impl_le! { + serialize_i16 => i16, + serialize_i32 => i32, + serialize_i64 => i64, + serialize_i128 => i128, + serialize_u16 => u16, + serialize_u32 => u32, + serialize_u64 => u64, + serialize_u128 => u128, + serialize_f32 => f32, + serialize_f64 => f64, + } + + fn serialize_char(self, v: char) -> Result<(), Error> { + let mut tmp = [0u8; 4]; + self.serialize_str(v.encode_utf8(&mut tmp)) + } + + fn serialize_str(self, v: &str) -> Result<(), Error> { + write_uvarint(self.buf, v.len() as u64); + self.buf.extend_from_slice(v.as_bytes()); + Ok(()) + } + + /// Raw byte write — no length prefix. Matches RowBinary's fixed-width + /// encoding for UUID and FixedString(N). Variable-length byte sequences + /// should be modeled as `Vec` and hit `serialize_seq`. + fn serialize_bytes(self, v: &[u8]) -> Result<(), Error> { + self.buf.extend_from_slice(v); + Ok(()) + } + + fn serialize_none(self) -> Result<(), Error> { + self.buf.push(1); + Ok(()) + } + + fn serialize_some(self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + self.buf.push(0); + value.serialize(self) + } + + fn serialize_unit(self) -> Result<(), Error> { + Ok(()) + } + + fn serialize_unit_struct(self, _name: &'static str) -> Result<(), Error> { + Ok(()) + } + + fn serialize_unit_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + ) -> Result<(), Error> { + Err(Error::Unsupported("enum variant")) + } + + fn serialize_newtype_struct(self, _name: &'static str, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(self) + } + + fn serialize_newtype_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + _value: &T, + ) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + Err(Error::Unsupported("enum variant")) + } + + fn serialize_seq(self, len: Option) -> Result { + let len = len.ok_or(Error::SequenceLengthRequired)?; + write_uvarint(self.buf, len as u64); + Ok(self) + } + + fn serialize_tuple(self, _len: usize) -> Result { + Ok(self) + } + + fn serialize_tuple_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_tuple_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Err(Error::Unsupported("enum variant")) + } + + fn serialize_map(self, len: Option) -> Result { + let len = len.ok_or(Error::SequenceLengthRequired)?; + write_uvarint(self.buf, len as u64); + Ok(self) + } + + fn serialize_struct( + self, + _name: &'static str, + _len: usize, + ) -> Result { + Ok(self) + } + + fn serialize_struct_variant( + self, + _name: &'static str, + _index: u32, + _variant: &'static str, + _len: usize, + ) -> Result { + Err(Error::Unsupported("enum variant")) + } +} + +impl<'a, 'b> SerializeSeq for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_element(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeTuple for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_element(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeTupleStruct for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_field(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeMap for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_key(&mut self, key: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + key.serialize(&mut **self) + } + fn serialize_value(&mut self, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +impl<'a, 'b> SerializeStruct for &'a mut Serializer<'b> { + type Ok = (); + type Error = Error; + fn serialize_field(&mut self, _key: &'static str, value: &T) -> Result<(), Error> + where + T: ?Sized + Serialize, + { + value.serialize(&mut **self) + } + fn end(self) -> Result<(), Error> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde::Serialize; + + fn encode(v: &T) -> Vec { + let mut buf = Vec::new(); + serialize_into(&mut buf, v).unwrap(); + buf + } + + #[test] + fn primitives_are_little_endian() { + assert_eq!(encode(&0x01u8), vec![0x01]); + assert_eq!(encode(&0x0102u16), vec![0x02, 0x01]); + assert_eq!(encode(&0x01020304u32), vec![0x04, 0x03, 0x02, 0x01]); + assert_eq!( + encode(&0x0102030405060708u64), + vec![0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01], + ); + assert_eq!(encode(&true), vec![1]); + assert_eq!(encode(&false), vec![0]); + } + + #[test] + fn strings_get_uvarint_length_prefix() { + assert_eq!(encode(&""), vec![0]); + assert_eq!(encode(&"abc"), vec![3, b'a', b'b', b'c']); + // 130-char string crosses the single-byte LEB128 boundary (0x80). + let s = "x".repeat(130); + let out = encode(&s); + assert_eq!(&out[..2], &[0x82, 0x01]); // 130 = 0b1_0000010 → 0x82 0x01 + assert_eq!(out.len(), 132); + } + + #[test] + fn bytes_are_raw_no_prefix() { + // serialize_bytes is the FixedString path — no length prefix. + // (We use this for UUIDs.) + struct Raw(Vec); + impl Serialize for Raw { + fn serialize(&self, s: S) -> Result { + s.serialize_bytes(&self.0) + } + } + let out = encode(&Raw(vec![0xaa, 0xbb, 0xcc])); + assert_eq!(out, vec![0xaa, 0xbb, 0xcc]); + } + + #[test] + fn sequences_prefix_then_elements() { + let v: Vec = vec![1, 2, 3]; + // 3 elements, then each u16 LE. + assert_eq!(encode(&v), vec![3, 1, 0, 2, 0, 3, 0]); + } + + #[test] + fn map_like_vec_of_pairs_matches_clickhouse_map_shape() { + // Map(String, UInt8) is wire-equivalent to Array(Tuple(String, UInt8)): + // uvarint length, then concatenated (string-prefix, key, u8) tuples. + let v: Vec<(String, u8)> = vec![("a".into(), 7), ("bb".into(), 9)]; + assert_eq!( + encode(&v), + vec![ + 2, // len + 1, b'a', 7, // ("a", 7) + 2, b'b', b'b', 9, // ("bb", 9) + ], + ); + } + + #[test] + fn uuid_adapter_emits_16_bytes_with_halves_reversed() { + use uuid::Uuid; + // 11223344-5566-7788-99aa-bbccddeeff00 + let id = Uuid::from_bytes([ + 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, + 0xff, 0x00, + ]); + #[derive(Serialize)] + struct W { + #[serde(with = "super::super::uuid")] + id: Uuid, + } + let out = encode(&W { id }); + assert_eq!( + out, + vec![ + 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, // first half reversed + 0x00, 0xff, 0xee, 0xdd, 0xcc, 0xbb, 0xaa, 0x99, // second half reversed + ], + ); + } + + #[test] + fn struct_fields_serialize_in_declaration_order() { + #[derive(Serialize)] + struct S { + a: u8, + b: u16, + c: bool, + } + let out = encode(&S { + a: 1, + b: 2, + c: true, + }); + assert_eq!(out, vec![1, 2, 0, 1]); + } +} diff --git a/rust_snuba/src/strategies/clickhouse/writer_v2.rs b/rust_snuba/src/strategies/clickhouse/writer_v2.rs index 3f48d4e97c4..00d87034caf 100644 --- a/rust_snuba/src/strategies/clickhouse/writer_v2.rs +++ b/rust_snuba/src/strategies/clickhouse/writer_v2.rs @@ -74,6 +74,24 @@ pub struct ClickhouseWriterStep { inner: RunTaskInThreads, BytesInsertBatch<()>, anyhow::Error, N>, } +/// Wire format the writer step posts to ClickHouse with. Picking the right +/// value here is the only thing the consumer needs to know — the pipeline +/// shape, batching, and retry behavior are identical across formats. +#[derive(Clone, Copy, Debug)] +pub enum InsertFormat { + JsonEachRow, + RowBinary, +} + +impl InsertFormat { + fn as_str(self) -> &'static str { + match self { + InsertFormat::JsonEachRow => "JSONEachRow", + InsertFormat::RowBinary => "RowBinary", + } + } +} + impl ClickhouseWriterStep where N: ProcessingStrategy> + 'static, @@ -85,6 +103,7 @@ where skip_write: bool, concurrency: &ConcurrencyConfig, storage_name: String, + format: InsertFormat, ) -> Self { let inner = RunTaskInThreads::new( next_step, @@ -93,6 +112,7 @@ where &cluster_config.clone(), &table, storage_name, + format, )), skip_write, ), @@ -154,7 +174,12 @@ pub struct ClickhouseClient { } impl ClickhouseClient { - pub fn new(config: &ClickhouseConfig, table: &str, storage_name: String) -> ClickhouseClient { + pub fn new( + config: &ClickhouseConfig, + table: &str, + storage_name: String, + format: InsertFormat, + ) -> ClickhouseClient { let mut headers = HeaderMap::with_capacity(6); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); headers.insert(ACCEPT_ENCODING, HeaderValue::from_static("gzip,deflate")); @@ -175,8 +200,13 @@ impl ClickhouseClient { let host = &config.host; let port = &config.http_port; - let base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1"); - let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); + let mut base_url = format!("{scheme}://{host}:{port}?insert_distributed_sync=1"); + if matches!(format, InsertFormat::RowBinary) { + // RowBinary cannot represent JSON values natively; tell ClickHouse + // to treat any binary string targeting a JSON column as JSON text. + base_url.push_str("&input_format_binary_read_json_as_string=1"); + } + let query = format!("INSERT INTO {table} FORMAT {}", format.as_str()); ClickhouseClient { client: Client::new(), @@ -322,7 +352,12 @@ mod tests { crate::testutils::initialize_python(); let config = make_test_config(); println!("config: {config:?}"); - let client = ClickhouseClient::new(&config, "querylog_local", "test_storage".to_string()); + let client = ClickhouseClient::new( + &config, + "querylog_local", + "test_storage".to_string(), + InsertFormat::JsonEachRow, + ); let url = client.build_url(); assert!(url.contains("load_balancing=in_order")); @@ -337,7 +372,12 @@ mod tests { fn test_url_with_runtime_config_override() { crate::testutils::initialize_python(); let config = make_test_config(); - let client = ClickhouseClient::new(&config, "test_table", "writer_v2_lb_test".to_string()); + let client = ClickhouseClient::new( + &config, + "test_table", + "writer_v2_lb_test".to_string(), + InsertFormat::JsonEachRow, + ); // Default: in_order let url = client.build_url(); @@ -367,6 +407,7 @@ mod tests { &config, "test_table", "writer_v2_block_size_test".to_string(), + InsertFormat::JsonEachRow, ); // Default (key absent): no suffix. @@ -382,8 +423,12 @@ mod tests { assert!(url.contains("&max_insert_block_size=2000000")); // A different storage isn't affected. - let other_client = - ClickhouseClient::new(&config, "test_table", "writer_v2_other_storage".to_string()); + let other_client = ClickhouseClient::new( + &config, + "test_table", + "writer_v2_other_storage".to_string(), + InsertFormat::JsonEachRow, + ); let url = other_client.build_url(); assert!(!url.contains("max_insert_block_size")); @@ -419,7 +464,12 @@ mod tests { database: "default".to_string(), }; - let client = ClickhouseClient::new(&config, "test_table", "test_storage".to_string()); + let client = ClickhouseClient::new( + &config, + "test_table", + "test_storage".to_string(), + InsertFormat::JsonEachRow, + ); let start_time = Instant::now(); let result = client diff --git a/rust_snuba/src/strategies/processor.rs b/rust_snuba/src/strategies/processor.rs index d3f055912cc..29059e08c33 100644 --- a/rust_snuba/src/strategies/processor.rs +++ b/rust_snuba/src/strategies/processor.rs @@ -16,8 +16,8 @@ use sentry_kafka_schemas::{Schema, SchemaError, SchemaType}; use crate::config::ProcessorConfig; use crate::processors::{ProcessingFunction, ProcessingFunctionWithReplacements}; use crate::types::{ - BytesInsertBatch, CommitLogEntry, CommitLogOffsets, EstimatedSize, InsertBatch, - InsertOrReplacement, KafkaMessageMetadata, RowData, TypedInsertBatch, + BytesInsertBatch, CommitLogEntry, CommitLogOffsets, InsertBatch, InsertOrReplacement, + KafkaMessageMetadata, RowData, }; use tokio::time::Instant; @@ -179,86 +179,6 @@ pub fn make_rust_processor_with_replacements( )) } -pub fn make_rust_processor_row_binary( - next_step: impl ProcessingStrategy>> + 'static, - func: fn( - KafkaPayload, - KafkaMessageMetadata, - &ProcessorConfig, - ) -> anyhow::Result>, - schema_name: &str, - enforce_schema: bool, - concurrency: &ConcurrencyConfig, - processor_config: ProcessorConfig, - stop_at_timestamp: Option, -) -> Box> { - let schema = get_schema(schema_name, enforce_schema); - - fn result_to_next_msg( - transformed: TypedInsertBatch, - partition: Partition, - offset: u64, - timestamp: DateTime, - stop_at_timestamp: Option, - ) -> anyhow::Result>>> { - // If a stop timestamp is set (used for backfills / replays), skip - // processing messages whose timestamp exceeds the cutoff by returning - // an empty batch that still commits the offset. - if let Some(stop) = stop_at_timestamp { - if stop < timestamp.timestamp() { - let payload = BytesInsertBatch::from_rows(Vec::new()); - return Ok(Message::new_broker_message( - payload, partition, offset, timestamp, - )); - } - } - - let num_bytes: usize = transformed.rows.iter().map(|r| r.estimated_size()).sum(); - let mut payload = BytesInsertBatch::from_rows(transformed.rows) - .with_num_bytes(num_bytes) - .with_message_timestamp(timestamp) - .with_commit_log_offsets(CommitLogOffsets(BTreeMap::from([( - partition.index, - CommitLogEntry { - offset: offset + 1, - orig_message_ts: timestamp, - received_p99: transformed.origin_timestamp.into_iter().collect(), - }, - )]))) - .with_cogs_data(transformed.cogs_data.unwrap_or_default()); - - if let Some(ts) = transformed.origin_timestamp { - payload = payload.with_origin_timestamp(ts); - } - if let Some(ts) = transformed.sentry_received_timestamp { - payload = payload.with_sentry_received_timestamp(ts); - } - if let Some(metrics) = transformed.item_type_metrics { - payload = payload.with_item_type_metrics(metrics); - } - - Ok(Message::new_broker_message( - payload, partition, offset, timestamp, - )) - } - - let task_runner = MessageProcessor { - schema, - enforce_schema, - func, - result_to_next_msg: result_to_next_msg::, - processor_config, - stop_at_timestamp, - }; - - Box::new(RunTaskInThreads::new( - next_step, - task_runner, - concurrency, - Some("process_message"), - )) -} - pub fn get_schema(schema_name: &str, enforce_schema: bool) -> Option> { match sentry_kafka_schemas::get_schema(schema_name, None) { Ok(s) => Some(Arc::new(s)), diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 38c4893f441..6a5239962e2 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -9,12 +9,6 @@ use sentry_arroyo::timer; use sentry_protos::snuba::v1::TraceItemType; use serde::{Deserialize, Serialize}; -/// Trait for row types that can estimate their in-memory byte size. -/// Used by byte-based batch size calculation in the Reduce step. -pub trait EstimatedSize { - fn estimated_size(&self) -> usize; -} - #[derive(Clone, Debug, PartialEq)] pub struct CommitLogEntry { pub offset: u64, @@ -456,48 +450,6 @@ impl BytesInsertBatch { } } -impl BytesInsertBatch> { - pub fn len(&self) -> usize { - self.rows.len() - } - - pub fn merge(mut self, other: Self) -> Self { - self.rows.extend(other.rows); - self.num_bytes += other.num_bytes; - self.commit_log_offsets.merge(other.commit_log_offsets); - self.message_timestamp.merge(other.message_timestamp); - self.origin_timestamp.merge(other.origin_timestamp); - self.sentry_received_timestamp - .merge(other.sentry_received_timestamp); - self.cogs_data.merge(other.cogs_data); - self.item_type_metrics.merge(other.item_type_metrics); - self - } -} - -/// The return value of message processors that produce typed rows for RowBinary insertion. -/// A single Kafka message may produce multiple rows, hence Vec. -#[derive(Clone, Debug)] -pub struct TypedInsertBatch { - pub rows: Vec, - pub origin_timestamp: Option>, - pub sentry_received_timestamp: Option>, - pub cogs_data: Option, - pub item_type_metrics: Option, -} - -impl TypedInsertBatch { - pub fn from_rows(rows: Vec, origin_timestamp: Option>) -> Self { - Self { - rows, - origin_timestamp, - sentry_received_timestamp: None, - cogs_data: None, - item_type_metrics: None, - } - } -} - #[derive(Clone, Debug, Deserialize, Serialize, Default, PartialEq)] pub struct RowData { pub encoded_rows: Vec, From a37250795137a6e2d205570cadd702014a249789 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 29 May 2026 12:28:21 -0700 Subject: [PATCH 2/7] fix(eap-items): Force Rust processor when use_row_binary is on When use_row_binary=true and use_rust_processor=false, the factory's match fell through to the Python processor (which emits JSONEachRow bytes) while the writer was already configured with FORMAT RowBinary - ClickHouse would reject the batch. The pre-collapse code avoided this by early-returning from create_row_binary_pipeline, which always used the Rust processor regardless of use_rust_processor. Coalesce use_rust_processor || use_row_binary at the dispatch site so the override path is always reachable when RowBinary is requested. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust_snuba/src/factory_v2.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index f1b39834af8..e56c94eee71 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -188,9 +188,16 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { ) .flush_empty_batches(true); + // RowBinary can only be emitted by the Rust processor (the Python path + // always returns JSONEachRow bytes). If the storage opted into + // RowBinary, force the Rust processor branch — otherwise we'd POST + // JSON bytes under `FORMAT RowBinary` and ClickHouse would reject the + // batch. The previous early-return for RowBinary did this implicitly. + let use_rust_processor = self.use_rust_processor || self.use_row_binary; + // Transform messages let next_step = match ( - self.use_rust_processor, + use_rust_processor, processors::get_processing_function( &self.storage_config.message_processor.python_class_name, ), From 2db198c10474b97ea9aa848dbf82e32f97b62823 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 29 May 2026 13:39:41 -0700 Subject: [PATCH 3/7] fix(eap-items): Emit explicit column list in RowBinary INSERTs Without a column list, ClickHouse maps the RowBinary stream to columns using the table's on-disk positional order, which does not match the EAPItemRow struct's wire order in two places: * `client_sample_rate` and `server_sample_rate` were added by migration 0048 with identical `AFTER sampling_factor`, so on disk the pair is reversed (server precedes client) while the struct keeps client first. * The initial 0024 migration places all `attributes_string_*` columns before all `attributes_float_*`, but the struct interleaves them per the `seq_attrs!` expansion. The integration test exposed this as `CANNOT_READ_ALL_DATA` deep inside the bucket maps. clickhouse-rs masked the same hazard by always emitting the column list via the `Row` derive; our vendored serializer has to opt back in. Add `EAPItemRow::COLUMN_NAMES` in struct order, thread an `Option<&'static [&'static str]>` through `ClickhouseWriterStep` and `ClickhouseClient`, and expand it as `INSERT INTO t (col1, ...) FORMAT RowBinary`. JSONEachRow stays column-list-less. Co-Authored-By: Claude Opus 4.7 (1M context) Agent transcript: https://claudescope.sentry.dev/share/xYvkk5ydje_vV3XmhCpsmSV44ohsxFeKHfKRg-ZUaIk --- rust_snuba/src/factory_v2.rs | 21 ++++-- rust_snuba/src/processors/eap_items.rs | 75 ++++++++++++++++++- .../src/strategies/clickhouse/writer_v2.rs | 22 +++++- 3 files changed, 110 insertions(+), 8 deletions(-) diff --git a/rust_snuba/src/factory_v2.rs b/rust_snuba/src/factory_v2.rs index e56c94eee71..bc0ed692528 100644 --- a/rust_snuba/src/factory_v2.rs +++ b/rust_snuba/src/factory_v2.rs @@ -92,20 +92,30 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { // processor function is the only thing that varies between them: it // encodes each row to the right wire format up front and the // pipeline carries the resulting bytes the rest of the way. - let (insert_format, process_fn_override) = if self.use_row_binary { + let (insert_format, process_fn_override, insert_columns): ( + InsertFormat, + Option, + Option<&'static [&'static str]>, + ) = if self.use_row_binary { tracing::info!("Using RowBinary wire format"); let processor_name = self .storage_config .message_processor .python_class_name .as_str(); - let func: crate::processors::ProcessingFunction = match processor_name { - "EAPItemsProcessor" => crate::processors::eap_items::process_message_row_binary, + let (func, columns): ( + crate::processors::ProcessingFunction, + &'static [&'static str], + ) = match processor_name { + "EAPItemsProcessor" => ( + crate::processors::eap_items::process_message_row_binary, + crate::processors::eap_items::EAPItemRow::COLUMN_NAMES, + ), name => panic!("RowBinary not supported for processor: {name}"), }; - (InsertFormat::RowBinary, Some(func)) + (InsertFormat::RowBinary, Some(func), Some(columns)) } else { - (InsertFormat::JsonEachRow, None) + (InsertFormat::JsonEachRow, None, None) }; // Commit offsets @@ -153,6 +163,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactoryV2 { &self.clickhouse_concurrency, self.storage_config.name.clone(), insert_format, + insert_columns, ); let accumulator = Arc::new( diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 5a3651b6723..81a700df9ba 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -494,6 +494,47 @@ pub struct EAPItemRow { } } +seq_attrs! { +impl EAPItemRow { + /// Column names in struct (= wire) order. We MUST pass this list to + /// ClickHouse on insert (`INSERT INTO t (col1, col2, ...) FORMAT RowBinary`) + /// because the on-disk column order in `eap_items_1_local` differs from + /// the struct order: + /// + /// * `client_sample_rate` and `server_sample_rate` were added with + /// identical `AFTER sampling_factor` in migration 0048, so the table + /// ends up with the pair reversed (server before client). + /// * The struct interleaves `attributes_string_N, attributes_float_N` for + /// each `N` (per the `seq_attrs!` expansion), while the initial table + /// put all `attributes_string_*` first, then all `attributes_float_*`. + /// + /// Without an explicit column list, ClickHouse falls back to the table's + /// positional order and misreads bytes (the integration test hits a + /// `CANNOT_READ_ALL_DATA` deep inside the maps section). + pub(crate) const COLUMN_NAMES: &'static [&'static str] = &[ + "organization_id", + "project_id", + "item_type", + "timestamp", + "trace_id", + "item_id", + "sampling_weight", + "sampling_factor", + "client_sample_rate", + "server_sample_rate", + "retention_days", + "downsampled_retention_days", + "attributes_bool", + "attributes_int", + #( + concat!("attributes_string_", stringify!(N)), + concat!("attributes_float_", stringify!(N)), + )* + "attributes_array", + ]; +} +} + impl TryFrom for EAPItemRow { type Error = anyhow::Error; @@ -934,6 +975,30 @@ mod tests { assert!(!should_dlq_for_prior_partition(event_ts, now, 45)); } + /// The column list we ship with `INSERT INTO ... FORMAT RowBinary` must + /// match the struct's field order exactly — otherwise ClickHouse misreads + /// bytes (e.g., a `Map(String, String)` worth of data lands in a column + /// declared `Map(String, Float64)`). Lock the order down here so anyone + /// adding/reordering fields on `EAPItemRow` also updates this list. + #[test] + fn test_column_names_match_struct_layout() { + let names = EAPItemRow::COLUMN_NAMES; + // 12 scalars + attributes_bool + attributes_int + 80 buckets + attributes_array + assert_eq!(names.len(), 95); + assert_eq!(names[0], "organization_id"); + assert_eq!(names[7], "sampling_factor"); + assert_eq!(names[8], "client_sample_rate"); + assert_eq!(names[9], "server_sample_rate"); + // Bucket pairs are interleaved (string_N then float_N) per the + // `seq_attrs!` expansion on EAPItemRow. + assert_eq!(names[14], "attributes_string_0"); + assert_eq!(names[15], "attributes_float_0"); + assert_eq!(names[16], "attributes_string_1"); + assert_eq!(names[92], "attributes_string_39"); + assert_eq!(names[93], "attributes_float_39"); + assert_eq!(names[94], "attributes_array"); + } + #[test] fn test_should_not_dlq_event_at_boundary() { // event_ts == boundary belongs to the new week — must not DLQ. @@ -1522,12 +1587,18 @@ mod tests { .expect("The message should be processed"); assert_eq!(batch.rows.num_rows, 1); - // Insert: POST the pre-encoded bytes with FORMAT RowBinary. + // Insert: POST the pre-encoded bytes with FORMAT RowBinary. We must + // pass the column list — the struct's wire order does NOT match the + // table's on-disk column order (see EAPItemRow::COLUMN_NAMES). + let insert_query = format!( + "INSERT INTO eap_items_1_local ({}) FORMAT RowBinary", + EAPItemRow::COLUMN_NAMES.join(", "), + ); let insert_resp = http .post(&base_url) .header("X-ClickHouse-Database", &database) .query(&[ - ("query", "INSERT INTO eap_items_1_local FORMAT RowBinary"), + ("query", insert_query.as_str()), ("input_format_binary_read_json_as_string", "1"), ("insert_deduplicate", "0"), ]) diff --git a/rust_snuba/src/strategies/clickhouse/writer_v2.rs b/rust_snuba/src/strategies/clickhouse/writer_v2.rs index 00d87034caf..2a6049e919b 100644 --- a/rust_snuba/src/strategies/clickhouse/writer_v2.rs +++ b/rust_snuba/src/strategies/clickhouse/writer_v2.rs @@ -96,6 +96,11 @@ impl ClickhouseWriterStep where N: ProcessingStrategy> + 'static, { + /// `columns`: if `Some`, expand into the SQL as + /// `INSERT INTO {table} (col1, col2, ...) FORMAT ...`. Required for + /// `RowBinary`, which would otherwise fall back to the table's positional + /// column order — a footgun whenever wire order and table order diverge. + /// For `JSONEachRow`, pass `None` to preserve historical behavior. pub fn new( next_step: N, cluster_config: ClickhouseConfig, @@ -104,6 +109,7 @@ where concurrency: &ConcurrencyConfig, storage_name: String, format: InsertFormat, + columns: Option<&'static [&'static str]>, ) -> Self { let inner = RunTaskInThreads::new( next_step, @@ -113,6 +119,7 @@ where &table, storage_name, format, + columns, )), skip_write, ), @@ -179,6 +186,7 @@ impl ClickhouseClient { table: &str, storage_name: String, format: InsertFormat, + columns: Option<&[&str]>, ) -> ClickhouseClient { let mut headers = HeaderMap::with_capacity(6); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); @@ -206,7 +214,14 @@ impl ClickhouseClient { // to treat any binary string targeting a JSON column as JSON text. base_url.push_str("&input_format_binary_read_json_as_string=1"); } - let query = format!("INSERT INTO {table} FORMAT {}", format.as_str()); + let columns_clause = match columns { + Some(cols) => format!(" ({})", cols.join(", ")), + None => String::new(), + }; + let query = format!( + "INSERT INTO {table}{columns_clause} FORMAT {fmt}", + fmt = format.as_str(), + ); ClickhouseClient { client: Client::new(), @@ -357,6 +372,7 @@ mod tests { "querylog_local", "test_storage".to_string(), InsertFormat::JsonEachRow, + None, ); let url = client.build_url(); @@ -377,6 +393,7 @@ mod tests { "test_table", "writer_v2_lb_test".to_string(), InsertFormat::JsonEachRow, + None, ); // Default: in_order @@ -408,6 +425,7 @@ mod tests { "test_table", "writer_v2_block_size_test".to_string(), InsertFormat::JsonEachRow, + None, ); // Default (key absent): no suffix. @@ -428,6 +446,7 @@ mod tests { "test_table", "writer_v2_other_storage".to_string(), InsertFormat::JsonEachRow, + None, ); let url = other_client.build_url(); assert!(!url.contains("max_insert_block_size")); @@ -469,6 +488,7 @@ mod tests { "test_table", "test_storage".to_string(), InsertFormat::JsonEachRow, + None, ); let start_time = Instant::now(); From e05792f2fc1e58a34a126a4bc03027ff9987b6e5 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 29 May 2026 13:47:08 -0700 Subject: [PATCH 4/7] fix(eap-items): Silence clippy too_many_arguments on writer constructor ClickhouseWriterStep::new took 7 args before the column-list parameter landed; the 8th tipped it over the clippy::too_many_arguments threshold (set repo-wide via -D warnings) and broke "Linting - Rust" in CI. The arguments map 1:1 to ClickhouseClient/ClickhouseConfig knobs the factory already owns separately, so bundling them just to satisfy the lint would add an indirection without a real abstraction. Allow the lint at the call site instead. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust_snuba/src/strategies/clickhouse/writer_v2.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust_snuba/src/strategies/clickhouse/writer_v2.rs b/rust_snuba/src/strategies/clickhouse/writer_v2.rs index 2a6049e919b..14cd0e00553 100644 --- a/rust_snuba/src/strategies/clickhouse/writer_v2.rs +++ b/rust_snuba/src/strategies/clickhouse/writer_v2.rs @@ -101,6 +101,7 @@ where /// `RowBinary`, which would otherwise fall back to the table's positional /// column order — a footgun whenever wire order and table order diverge. /// For `JSONEachRow`, pass `None` to preserve historical behavior. + #[allow(clippy::too_many_arguments)] pub fn new( next_step: N, cluster_config: ClickhouseConfig, From b38978ba16faf3aeeaac9531d878c672e1ba9374 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 29 May 2026 14:53:57 -0700 Subject: [PATCH 5/7] test(eap-items): Surface ClickHouse response body when SELECT fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The integration test's `assert!(status.is_success())` for the read-back SELECT was hiding ClickHouse's error message — only the assertion's static "Select failed" string surfaced in CI logs. Capture the status and response body up front and include both in the assertion message so the next CI failure tells us what ClickHouse actually rejected. Co-Authored-By: Claude Opus 4.7 (1M context) Agent transcript: https://claudescope.sentry.dev/share/fQtr5P5jc3905fJGpnXjV3rTvV3OFF6pHP9bx72Meo4 --- rust_snuba/src/processors/eap_items.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 81a700df9ba..788d64cb134 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -1630,8 +1630,12 @@ mod tests { .send() .await .expect("Select request failed to send"); - assert!(select_resp.status().is_success(), "Select failed"); + let select_status = select_resp.status(); let body_text = select_resp.text().await.expect("response body"); + assert!( + select_status.is_success(), + "Select failed: status={select_status}, body={body_text}" + ); let body: serde_json::Value = serde_json::from_str(&body_text).expect("JSON response"); let data = body["data"].as_array().expect("data array"); assert_eq!(data.len(), 1, "no rows found for org_id={unique_org_id}"); From 87c92e6dcf404ac2d873c8e5275c535ffecb851f Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 29 May 2026 15:35:09 -0700 Subject: [PATCH 6/7] fix(eap-items): Use GET for SELECT in row-binary integration test ClickHouse 25.x rejects bodyless POST requests with HTTP 411 HTTP_LENGTH_REQUIRED ("Transfer-Encoding is not chunked and there is no Content-Length header for POST request"). reqwest only emits Content-Length when a body is set, so POST + URL-encoded query + no body trips the check. GET fits the SELECT semantically anyway. For the ALTER TABLE cleanup keep POST but attach an empty body so reqwest emits Content-Length: 0. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust_snuba/src/processors/eap_items.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 788d64cb134..860617d4f41 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -1615,8 +1615,12 @@ mod tests { // Read it back via FORMAT JSON. We use organization_id (primary key prefix) // for a deterministic lookup. ClickHouse's FORMAT JSON renders 64-bit // ints as strings to avoid JS-precision loss, so we parse them back. + // + // GET (not POST) for the read-back: ClickHouse 25.x rejects bodyless + // POSTs with HTTP 411 because reqwest doesn't emit a Content-Length + // header by default when there's nothing to send. let select_resp = http - .post(&base_url) + .get(&base_url) .header("X-ClickHouse-Database", &database) .query(&[( "query", @@ -1665,7 +1669,8 @@ mod tests { 1 ); - // Clean up + // Clean up. POST with an empty body (rather than no body) so reqwest + // emits Content-Length: 0 and ClickHouse 25.x doesn't reject with 411. let _ = http .post(&base_url) .header("X-ClickHouse-Database", &database) @@ -1675,6 +1680,7 @@ mod tests { "ALTER TABLE eap_items_1_local DELETE WHERE organization_id = {unique_org_id}" ), )]) + .body("") .send() .await; } From 12bc94bde7dfcb6a3669a9ae50cdd69916e4b732 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Sun, 31 May 2026 10:56:13 -0700 Subject: [PATCH 7/7] ref(eap-items): Mirror from_rows with a from_encoded_rows constructor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The JSON processor builds its batch via `InsertBatch::from_rows(...)?` and then sets `cogs_data` / `item_type_metrics` on the returned value. The RowBinary processor was hand-constructing the `InsertBatch` literal because there was no parallel "I already have the wire bytes" constructor. The two paths now look the same: build the batch, set the two metric fields, return. Pure refactor — no behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust_snuba/src/processors/eap_items.rs | 16 +++++----------- rust_snuba/src/types.rs | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/rust_snuba/src/processors/eap_items.rs b/rust_snuba/src/processors/eap_items.rs index 860617d4f41..727f98ed3b5 100644 --- a/rust_snuba/src/processors/eap_items.rs +++ b/rust_snuba/src/processors/eap_items.rs @@ -21,7 +21,7 @@ use crate::processors::utils::{ use crate::runtime_config::get_str_config; use crate::strategies::clickhouse::rowbinary; use crate::types::CogsData; -use crate::types::{item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata, RowData}; +use crate::types::{item_type_name, InsertBatch, ItemTypeMetrics, KafkaMessageMetadata}; /// Runtime config key prefix. Per-storage key /// `eap_items_dlq_grace_period_min:`: a non-negative integer @@ -214,16 +214,10 @@ pub fn process_message_row_binary( let mut encoded_rows = Vec::new(); rowbinary::serialize_into(&mut encoded_rows, &row)?; - Ok(InsertBatch { - rows: RowData { - encoded_rows, - num_rows: 1, - }, - origin_timestamp: processed.origin_timestamp, - sentry_received_timestamp: None, - cogs_data: Some(processed.cogs_data), - item_type_metrics: Some(processed.item_type_metrics), - }) + let mut batch = InsertBatch::from_encoded_rows(encoded_rows, 1, processed.origin_timestamp); + batch.cogs_data = Some(processed.cogs_data); + batch.item_type_metrics = Some(processed.item_type_metrics); + Ok(batch) } /// Test-only: returns the typed `EAPItemRow` (plus the metadata fields the diff --git a/rust_snuba/src/types.rs b/rust_snuba/src/types.rs index 6a5239962e2..b84b0e5fe7f 100644 --- a/rust_snuba/src/types.rs +++ b/rust_snuba/src/types.rs @@ -220,6 +220,27 @@ impl InsertBatch { }) } + /// Constructor for processors that have already serialized their rows into + /// the wire format (e.g. the RowBinary path encodes each `EAPItemRow` + /// inside the processor instead of carrying the typed struct downstream). + /// `num_rows` is the count those bytes represent. + pub fn from_encoded_rows( + encoded_rows: Vec, + num_rows: usize, + origin_timestamp: Option>, + ) -> Self { + Self { + rows: RowData { + encoded_rows, + num_rows, + }, + origin_timestamp, + sentry_received_timestamp: None, + cogs_data: None, + item_type_metrics: None, + } + } + /// In case the processing function wants to skip the message, we return an empty batch. /// But instead of having the caller send an empty batch, lets make an explicit api for /// skipping. This way we can change the implementation later if we want to. Skipping ensures