Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rust/NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# NEXT CHANGELOG

## Release v2.1.2
## Release v2.2.0

### Major Changes

### New Features and Improvements

- Added a 10 MiB payload size limit per `ingest_record_offset` / `ingest_records_offset` call. Attempts to ingest more than 10 MiB of encoded data in a single call now return `ZerobusError::InvalidArgument` immediately, before any network I/O.

### Bug Fixes

### Documentation
Expand Down
15 changes: 14 additions & 1 deletion rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -727,11 +727,24 @@ Auto-recovered if `recovery` is enabled:
Require manual intervention:
- `InvalidUCTokenError` - Invalid OAuth credentials
- `InvalidTableName` - Table doesn't exist or invalid format
- `InvalidArgument` - Invalid parameters or schema mismatch
- `InvalidArgument` - Invalid parameters, schema mismatch, or payload too large (see [Payload Size Limit](#payload-size-limit))
- `Code::Unauthenticated` - Authentication failure
- `Code::PermissionDenied` - Insufficient table permissions
- `ChannelCreationError` - Failed to establish TLS connection

### Payload Size Limit

The Zerobus server enforces a **10 MiB limit** per ingest call. The SDK enforces this client-side so you get an immediate `InvalidArgument` error rather than a server rejection:

```rust
// This will immediately return Err(ZerobusError::InvalidArgument(...))
let oversized = vec![0u8; 11 * 1024 * 1024];
let result = stream.ingest_record_offset(oversized).await;
// Err: Ingest payload too large: 11534336 bytes exceeds the 10 MiB limit
```

The limit applies to the total encoded size of the call — the sum of all record bytes passed to `ingest_record_offset` or `ingest_records_offset`. Split large payloads across multiple calls to stay within the limit.

**Check if an error is retryable:**

```rust
Expand Down
11 changes: 11 additions & 0 deletions rust/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ pub mod zeroparser;

const SHUTDOWN_TIMEOUT_SECS: u64 = 1;

/// Maximum encoded byte size allowed per `ingest_record_offset` / `ingest_records_offset` call.
/// Matches the server limit so oversize payloads fail fast client-side.
const MAX_INGEST_PAYLOAD_BYTES: usize = 10 * 1024 * 1024; // 10 MiB

/// Maximum time to wait for the receiver/sender tasks to finish during stream
/// teardown.
const STREAM_TEARDOWN_DRAIN_TIMEOUT_MS: u64 = 500;
Expand Down Expand Up @@ -1111,6 +1115,13 @@ impl ZerobusStream {

/// Internal unified method for ingesting records and batches
async fn ingest_internal_v2(&self, encoded_batch: EncodedBatch) -> ZerobusResult<OffsetId> {
let byte_size = encoded_batch.total_byte_size();
if byte_size > MAX_INGEST_PAYLOAD_BYTES {
return Err(ZerobusError::InvalidArgument(format!(
"Ingest payload too large: {byte_size} bytes exceeds the 10 MiB limit ({MAX_INGEST_PAYLOAD_BYTES} bytes)"
)));
}

if self.is_closed.load(Ordering::Relaxed) {
error!(table_name = %self.table_properties.table_name, "Stream closed");
return Err(ZerobusError::StreamClosedError(tonic::Status::internal(
Expand Down
26 changes: 26 additions & 0 deletions rust/sdk/src/record_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,14 @@ impl EncodedBatch {
pub fn is_empty(&self) -> bool {
self.get_record_count() == 0
}

/// Returns the total encoded byte size of all records in this batch.
pub fn total_byte_size(&self) -> usize {
match self {
EncodedBatch::Proto(records) => records.iter().map(|r| r.len()).sum(),
EncodedBatch::Json(records) => records.iter().map(|s| s.len()).sum(),
}
}
}

impl IntoIterator for EncodedBatch {
Expand Down Expand Up @@ -660,6 +668,24 @@ mod tests {
assert_eq!(empty_batch.get_record_count(), 0);
}

#[test]
fn test_total_byte_size_proto() {
let batch = EncodedBatch::Proto(smallvec![vec![1, 2, 3], vec![4, 5]]);
assert_eq!(batch.total_byte_size(), 5);
}

#[test]
fn test_total_byte_size_json() {
let batch = EncodedBatch::Json(smallvec!["hello".to_string(), "world!".to_string()]);
assert_eq!(batch.total_byte_size(), 11);
}

#[test]
fn test_total_byte_size_empty() {
let batch = EncodedBatch::Proto(smallvec![]);
assert_eq!(batch.total_byte_size(), 0);
}

#[test]
fn test_is_empty() {
let non_empty = EncodedBatch::Proto(smallvec![vec![1]]);
Expand Down
58 changes: 58 additions & 0 deletions rust/tests/src/rust_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,64 @@ mod schema_tests {
Ok(())
}

#[tokio::test]
async fn test_ingest_record_too_large_fails() -> Result<(), Box<dyn std::error::Error>> {
setup_tracing();
info!("Starting test_ingest_record_too_large_fails");

let (_mock_server, server_url) = start_mock_server().await?;
let sdk = ZerobusSdk::builder()
.endpoint(server_url.clone())
.unity_catalog_url("https://mock-uc.com")
.tls_config(Arc::new(NoTlsConfig))
.build()?;

let stream = sdk
.stream_builder()
.table(TABLE_NAME)
.headers_provider(Arc::new(TestHeadersProvider::default()))
.compiled_proto(Default::default())
.build()
.await?;

// 10 MiB + 1 byte
let oversized = vec![0u8; 10 * 1024 * 1024 + 1];
let result = stream.ingest_record_offset(oversized).await;

assert!(matches!(result, Err(ZerobusError::InvalidArgument(_))));

Ok(())
}

#[tokio::test]
async fn test_ingest_batch_too_large_fails() -> Result<(), Box<dyn std::error::Error>> {
setup_tracing();
info!("Starting test_ingest_batch_too_large_fails");

let (_mock_server, server_url) = start_mock_server().await?;
let sdk = ZerobusSdk::builder()
.endpoint(server_url.clone())
.unity_catalog_url("https://mock-uc.com")
.tls_config(Arc::new(NoTlsConfig))
.build()?;

let stream = sdk
.stream_builder()
.table(TABLE_NAME)
.headers_provider(Arc::new(TestHeadersProvider::default()))
.compiled_proto(Default::default())
.build()
.await?;

// Two records whose combined size exceeds 10 MiB
let batch = vec![vec![0u8; 6 * 1024 * 1024], vec![0u8; 5 * 1024 * 1024]];
let result = stream.ingest_records_offset(batch).await;

assert!(matches!(result, Err(ZerobusError::InvalidArgument(_))));

Ok(())
}

#[tokio::test]
async fn test_json_stream_creation_with_descriptor_warns(
) -> Result<(), Box<dyn std::error::Error>> {
Expand Down
Loading