From 836dbc95eb2931b76e7e35fe37d4494dc8171e7c Mon Sep 17 00:00:00 2001 From: elenagaljak-db Date: Thu, 28 May 2026 15:58:20 +0000 Subject: [PATCH 1/2] [Rust] Enforce 10 MiB client-side payload limit on ingest calls Signed-off-by: elenagaljak-db --- rust/NEXT_CHANGELOG.md | 4 ++- rust/README.md | 15 +++++++++- rust/sdk/src/lib.rs | 12 ++++++++ rust/sdk/src/record_types.rs | 26 ++++++++++++++++ rust/tests/src/rust_tests.rs | 58 ++++++++++++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 2 deletions(-) diff --git a/rust/NEXT_CHANGELOG.md b/rust/NEXT_CHANGELOG.md index 7d6cfa28..0f2e569b 100644 --- a/rust/NEXT_CHANGELOG.md +++ b/rust/NEXT_CHANGELOG.md @@ -1,11 +1,13 @@ # NEXT CHANGELOG -## Release v2.1.2 +## Release v2.2.0 ### Major Changes ### New Features and Improvements +- Added a 10 MiB payload size limit per `ingest_record_offset` / `ingest_records_offset` call. Attempts to ingest more than 10 MiB of encoded data in a single call now return `ZerobusError::InvalidArgument` immediately, before any network I/O. + ### Bug Fixes ### Documentation diff --git a/rust/README.md b/rust/README.md index 04b0c8a9..19921aed 100644 --- a/rust/README.md +++ b/rust/README.md @@ -727,11 +727,24 @@ Auto-recovered if `recovery` is enabled: Require manual intervention: - `InvalidUCTokenError` - Invalid OAuth credentials - `InvalidTableName` - Table doesn't exist or invalid format -- `InvalidArgument` - Invalid parameters or schema mismatch +- `InvalidArgument` - Invalid parameters, schema mismatch, or payload too large (see [Payload Size Limit](#payload-size-limit)) - `Code::Unauthenticated` - Authentication failure - `Code::PermissionDenied` - Insufficient table permissions - `ChannelCreationError` - Failed to establish TLS connection +### Payload Size Limit + +The Zerobus server enforces a **10 MiB limit** per ingest call. The SDK enforces this client-side so you get an immediate `InvalidArgument` error rather than a server rejection: + +```rust +// This will immediately return Err(ZerobusError::InvalidArgument(...)) +let oversized = vec![0u8; 11 * 1024 * 1024]; +let result = stream.ingest_record_offset(oversized).await; +// Err: Ingest payload too large: 11534336 bytes exceeds the 10 MiB limit +``` + +The limit applies to the total encoded size of the call — the sum of all record bytes passed to `ingest_record_offset` or `ingest_records_offset`. Split large payloads across multiple calls to stay within the limit. + **Check if an error is retryable:** ```rust diff --git a/rust/sdk/src/lib.rs b/rust/sdk/src/lib.rs index 090344a8..d1e6fef0 100644 --- a/rust/sdk/src/lib.rs +++ b/rust/sdk/src/lib.rs @@ -108,6 +108,11 @@ pub mod zeroparser; const SHUTDOWN_TIMEOUT_SECS: u64 = 1; +/// Maximum encoded byte size allowed per `ingest_record_offset` / `ingest_records_offset` call. +/// This mirrors the server-side limit: payloads exceeding this size will be rejected by the +/// server anyway, so we enforce it client-side for a faster, clearer error. +const MAX_INGEST_PAYLOAD_BYTES: usize = 10 * 1024 * 1024; // 10 MiB + /// Maximum time to wait for the receiver/sender tasks to finish during stream /// teardown. const STREAM_TEARDOWN_DRAIN_TIMEOUT_MS: u64 = 500; @@ -1111,6 +1116,13 @@ impl ZerobusStream { /// Internal unified method for ingesting records and batches async fn ingest_internal_v2(&self, encoded_batch: EncodedBatch) -> ZerobusResult { + let byte_size = encoded_batch.total_byte_size(); + if byte_size > MAX_INGEST_PAYLOAD_BYTES { + return Err(ZerobusError::InvalidArgument(format!( + "Ingest payload too large: {byte_size} bytes exceeds the 10 MiB limit ({MAX_INGEST_PAYLOAD_BYTES} bytes)" + ))); + } + if self.is_closed.load(Ordering::Relaxed) { error!(table_name = %self.table_properties.table_name, "Stream closed"); return Err(ZerobusError::StreamClosedError(tonic::Status::internal( diff --git a/rust/sdk/src/record_types.rs b/rust/sdk/src/record_types.rs index 6afd280e..c471c092 100644 --- a/rust/sdk/src/record_types.rs +++ b/rust/sdk/src/record_types.rs @@ -264,6 +264,14 @@ impl EncodedBatch { pub fn is_empty(&self) -> bool { self.get_record_count() == 0 } + + /// Returns the total encoded byte size of all records in this batch. + pub fn total_byte_size(&self) -> usize { + match self { + EncodedBatch::Proto(records) => records.iter().map(|r| r.len()).sum(), + EncodedBatch::Json(records) => records.iter().map(|s| s.len()).sum(), + } + } } impl IntoIterator for EncodedBatch { @@ -660,6 +668,24 @@ mod tests { assert_eq!(empty_batch.get_record_count(), 0); } + #[test] + fn test_total_byte_size_proto() { + let batch = EncodedBatch::Proto(smallvec![vec![1, 2, 3], vec![4, 5]]); + assert_eq!(batch.total_byte_size(), 5); + } + + #[test] + fn test_total_byte_size_json() { + let batch = EncodedBatch::Json(smallvec!["hello".to_string(), "world!".to_string()]); + assert_eq!(batch.total_byte_size(), 11); + } + + #[test] + fn test_total_byte_size_empty() { + let batch = EncodedBatch::Proto(smallvec![]); + assert_eq!(batch.total_byte_size(), 0); + } + #[test] fn test_is_empty() { let non_empty = EncodedBatch::Proto(smallvec![vec![1]]); diff --git a/rust/tests/src/rust_tests.rs b/rust/tests/src/rust_tests.rs index c01fa6f5..0b104ea8 100644 --- a/rust/tests/src/rust_tests.rs +++ b/rust/tests/src/rust_tests.rs @@ -984,6 +984,64 @@ mod schema_tests { Ok(()) } + #[tokio::test] + async fn test_ingest_record_too_large_fails() -> Result<(), Box> { + setup_tracing(); + info!("Starting test_ingest_record_too_large_fails"); + + let (_mock_server, server_url) = start_mock_server().await?; + let sdk = ZerobusSdk::builder() + .endpoint(server_url.clone()) + .unity_catalog_url("https://mock-uc.com") + .tls_config(Arc::new(NoTlsConfig)) + .build()?; + + let stream = sdk + .stream_builder() + .table(TABLE_NAME) + .headers_provider(Arc::new(TestHeadersProvider::default())) + .compiled_proto(Default::default()) + .build() + .await?; + + // 10 MiB + 1 byte + let oversized = vec![0u8; 10 * 1024 * 1024 + 1]; + let result = stream.ingest_record_offset(oversized).await; + + assert!(matches!(result, Err(ZerobusError::InvalidArgument(_)))); + + Ok(()) + } + + #[tokio::test] + async fn test_ingest_batch_too_large_fails() -> Result<(), Box> { + setup_tracing(); + info!("Starting test_ingest_batch_too_large_fails"); + + let (_mock_server, server_url) = start_mock_server().await?; + let sdk = ZerobusSdk::builder() + .endpoint(server_url.clone()) + .unity_catalog_url("https://mock-uc.com") + .tls_config(Arc::new(NoTlsConfig)) + .build()?; + + let stream = sdk + .stream_builder() + .table(TABLE_NAME) + .headers_provider(Arc::new(TestHeadersProvider::default())) + .compiled_proto(Default::default()) + .build() + .await?; + + // Two records whose combined size exceeds 10 MiB + let batch = vec![vec![0u8; 6 * 1024 * 1024], vec![0u8; 5 * 1024 * 1024]]; + let result = stream.ingest_records_offset(batch).await; + + assert!(matches!(result, Err(ZerobusError::InvalidArgument(_)))); + + Ok(()) + } + #[tokio::test] async fn test_json_stream_creation_with_descriptor_warns( ) -> Result<(), Box> { From 78da598af8a6c7a40613833932fa33fecf8224d8 Mon Sep 17 00:00:00 2001 From: elenagaljak-db Date: Fri, 29 May 2026 14:29:27 +0200 Subject: [PATCH 2/2] fix Signed-off-by: elenagaljak-db --- rust/sdk/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/sdk/src/lib.rs b/rust/sdk/src/lib.rs index d1e6fef0..d5e62c8a 100644 --- a/rust/sdk/src/lib.rs +++ b/rust/sdk/src/lib.rs @@ -109,8 +109,7 @@ pub mod zeroparser; const SHUTDOWN_TIMEOUT_SECS: u64 = 1; /// Maximum encoded byte size allowed per `ingest_record_offset` / `ingest_records_offset` call. -/// This mirrors the server-side limit: payloads exceeding this size will be rejected by the -/// server anyway, so we enforce it client-side for a faster, clearer error. +/// Matches the server limit so oversize payloads fail fast client-side. const MAX_INGEST_PAYLOAD_BYTES: usize = 10 * 1024 * 1024; // 10 MiB /// Maximum time to wait for the receiver/sender tasks to finish during stream