From 7d9ae4820b593dc1a333fc098449e3315bea85f8 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 7 May 2026 08:55:03 +0100 Subject: [PATCH 1/3] [rust][writer] Add DynamicWriteBatchSizeEstimator --- bindings/cpp/include/fluss.hpp | 6 +- bindings/cpp/src/ffi_converter.hpp | 2 + bindings/cpp/src/lib.rs | 4 + bindings/elixir/lib/fluss/config.ex | 17 +- .../elixir/native/fluss_nif/src/config.rs | 8 + bindings/python/fluss/__init__.pyi | 8 + bindings/python/src/config.rs | 40 +++++ crates/fluss/src/client/write/accumulator.rs | 154 +++++++++++++++--- .../src/client/write/dynamic_batch_size.rs | 139 ++++++++++++++++ crates/fluss/src/client/write/mod.rs | 1 + crates/fluss/src/config.rs | 26 +++ website/docs/user-guide/cpp/api-reference.md | 4 +- .../docs/user-guide/python/api-reference.md | 4 +- website/docs/user-guide/rust/api-reference.md | 4 +- 14 files changed, 388 insertions(+), 29 deletions(-) create mode 100644 crates/fluss/src/client/write/dynamic_batch_size.rs diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index f222166d..26cfcdf5 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1223,8 +1223,12 @@ struct Configuration { std::string writer_acks{"all"}; // Max number of writer retries int32_t writer_retries{std::numeric_limits::max()}; - // Writer batch size in bytes (2 MB) + // Writer batch size in bytes (2 MB), also the upper bound when dynamic sizing is on int32_t writer_batch_size{2 * 1024 * 1024}; + // Tune the per-table writer batch size from observed fill ratios + bool writer_dynamic_batch_size_enabled{false}; + // Lower bound (256 KB) for the dynamic batch size estimator + int32_t writer_batch_size_min{256 * 1024}; // Bucket assigner for tables without bucket keys: "sticky" or "round_robin" std::string writer_bucket_no_key_assigner{"sticky"}; // Number of remote log batches to prefetch during scanning diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 0ef1487a..21e856b4 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -118,6 +118,8 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.writer_acks = rust::String(config.writer_acks); ffi_config.writer_retries = config.writer_retries; ffi_config.writer_batch_size = config.writer_batch_size; + ffi_config.writer_dynamic_batch_size_enabled = config.writer_dynamic_batch_size_enabled; + ffi_config.writer_batch_size_min = config.writer_batch_size_min; ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner); ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 875373bc..19529788 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -46,6 +46,8 @@ mod ffi { writer_acks: String, writer_retries: i32, writer_batch_size: i32, + writer_dynamic_batch_size_enabled: bool, + writer_batch_size_min: i32, writer_bucket_no_key_assigner: String, scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, @@ -905,6 +907,8 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { writer_acks: config.writer_acks.to_string(), writer_retries: config.writer_retries, writer_batch_size: config.writer_batch_size, + writer_dynamic_batch_size_enabled: config.writer_dynamic_batch_size_enabled, + writer_batch_size_min: config.writer_batch_size_min, writer_batch_timeout_ms: config.writer_batch_timeout_ms, writer_bucket_no_key_assigner: assigner_type, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, diff --git a/bindings/elixir/lib/fluss/config.ex b/bindings/elixir/lib/fluss/config.ex index 1120a5f4..35781258 100644 --- a/bindings/elixir/lib/fluss/config.ex +++ b/bindings/elixir/lib/fluss/config.ex @@ -34,12 +34,16 @@ defmodule Fluss.Config do @enforce_keys [:bootstrap_servers] defstruct bootstrap_servers: nil, writer_batch_size: nil, - writer_batch_timeout_ms: nil + writer_batch_timeout_ms: nil, + writer_dynamic_batch_size_enabled: nil, + writer_batch_size_min: nil @type t :: %__MODULE__{ bootstrap_servers: String.t(), writer_batch_size: non_neg_integer() | nil, - writer_batch_timeout_ms: non_neg_integer() | nil + writer_batch_timeout_ms: non_neg_integer() | nil, + writer_dynamic_batch_size_enabled: boolean() | nil, + writer_batch_size_min: non_neg_integer() | nil } @spec new(String.t()) :: t() @@ -62,6 +66,15 @@ defmodule Fluss.Config do def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms), do: %{config | writer_batch_timeout_ms: ms} + @spec set_writer_dynamic_batch_size_enabled(t(), boolean()) :: t() + def set_writer_dynamic_batch_size_enabled(%__MODULE__{} = config, enabled) + when is_boolean(enabled), + do: %{config | writer_dynamic_batch_size_enabled: enabled} + + @spec set_writer_batch_size_min(t(), non_neg_integer()) :: t() + def set_writer_batch_size_min(%__MODULE__{} = config, size) when is_integer(size), + do: %{config | writer_batch_size_min: size} + @spec get_bootstrap_servers(t()) :: String.t() def get_bootstrap_servers(%__MODULE__{bootstrap_servers: servers}), do: servers end diff --git a/bindings/elixir/native/fluss_nif/src/config.rs b/bindings/elixir/native/fluss_nif/src/config.rs index 536ed68f..c16a57eb 100644 --- a/bindings/elixir/native/fluss_nif/src/config.rs +++ b/bindings/elixir/native/fluss_nif/src/config.rs @@ -25,6 +25,8 @@ pub struct NifConfig { pub bootstrap_servers: String, pub writer_batch_size: Option, pub writer_batch_timeout_ms: Option, + pub writer_dynamic_batch_size_enabled: Option, + pub writer_batch_size_min: Option, } impl NifConfig { @@ -39,6 +41,12 @@ impl NifConfig { if let Some(ms) = self.writer_batch_timeout_ms { config.writer_batch_timeout_ms = ms; } + if let Some(enabled) = self.writer_dynamic_batch_size_enabled { + config.writer_dynamic_batch_size_enabled = enabled; + } + if let Some(size) = self.writer_batch_size_min { + config.writer_batch_size_min = size; + } config } } diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index fc713973..594494b2 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -164,6 +164,14 @@ class Config: @writer_batch_size.setter def writer_batch_size(self, size: int) -> None: ... @property + def writer_dynamic_batch_size_enabled(self) -> bool: ... + @writer_dynamic_batch_size_enabled.setter + def writer_dynamic_batch_size_enabled(self, enabled: bool) -> None: ... + @property + def writer_batch_size_min(self) -> int: ... + @writer_batch_size_min.setter + def writer_batch_size_min(self, size: int) -> None: ... + @property def writer_bucket_no_key_assigner(self) -> str: ... @writer_bucket_no_key_assigner.setter def writer_bucket_no_key_assigner(self, value: str) -> None: ... diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 65bcc9ad..1dc261e2 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -60,6 +60,22 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } + "writer.dynamic-batch-size-enabled" => { + config.writer_dynamic_batch_size_enabled = match value.as_str() { + "true" => true, + "false" => false, + other => { + return Err(FlussError::new_err(format!( + "Invalid value '{other}' for '{key}', expected 'true' or 'false'" + ))); + } + }; + } + "writer.batch-size-min" => { + config.writer_batch_size_min = value.parse::().map_err(|e| { + FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) + })?; + } "writer.batch-timeout-ms" => { config.writer_batch_timeout_ms = value.parse::().map_err(|e| { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) @@ -250,6 +266,30 @@ impl Config { self.inner.writer_batch_size = size; } + /// Get whether the per-table dynamic batch size estimator is enabled + #[getter] + fn writer_dynamic_batch_size_enabled(&self) -> bool { + self.inner.writer_dynamic_batch_size_enabled + } + + /// Set whether the per-table dynamic batch size estimator is enabled + #[setter] + fn set_writer_dynamic_batch_size_enabled(&mut self, enabled: bool) { + self.inner.writer_dynamic_batch_size_enabled = enabled; + } + + /// Get the lower bound used by the dynamic batch size estimator + #[getter] + fn writer_batch_size_min(&self) -> i32 { + self.inner.writer_batch_size_min + } + + /// Set the lower bound used by the dynamic batch size estimator + #[setter] + fn set_writer_batch_size_min(&mut self, size: i32) { + self.inner.writer_batch_size_min = size; + } + /// Get the scanner remote log prefetch num #[getter] fn scanner_remote_log_prefetch_num(&self) -> usize { diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 019d3b05..1567aeaf 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -19,6 +19,7 @@ use crate::client::broadcast; use crate::client::write::IdempotenceManager; use crate::client::write::batch::WriteBatch::{ArrowLog, Kv}; use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, WriteBatch}; +use crate::client::write::dynamic_batch_size::DynamicWriteBatchSizeEstimator; use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord}; use crate::cluster::{BucketLocation, Cluster, ServerNode}; use crate::compression::ArrowCompressionRatioEstimator; @@ -307,28 +308,32 @@ impl RecordAccumulator { None }; - let (dq, compression_ratio_estimator) = { - let mut binding = - self.write_batches - .entry(Arc::clone(physical_table_path)) - .or_insert_with(|| BucketAndWriteBatches { - table_id: table_info.table_id, + let (dq, compression_ratio_estimator, dynamic_target) = { + let mut binding = self + .write_batches + .entry(Arc::clone(physical_table_path)) + .or_insert_with(|| { + BucketAndWriteBatches::new( + table_info.table_id, is_partitioned_table, partition_id, - batches: Default::default(), - compression_ratio_estimator: Arc::new( - ArrowCompressionRatioEstimator::default(), - ), - }); + &self.config, + ) + }); let bucket_and_batches = binding.value_mut(); let dq = bucket_and_batches .batches .entry(bucket_id) .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) .clone(); + let dynamic_target = self + .config + .writer_dynamic_batch_size_enabled + .then(|| bucket_and_batches.dynamic_batch_size.lock().current()); ( dq, Arc::clone(&bucket_and_batches.compression_ratio_estimator), + dynamic_target, ) }; @@ -347,12 +352,7 @@ impl RecordAccumulator { // producer holds dq + blocks on memory, while sender needs dq to drain. drop(dq_guard); - // TODO: Implement DynamicWriteBatchSizeEstimator matching Java's - // client.writer.dynamic-batch-size-enabled. Adjusts the batch size target - // per table based on observed actual batch sizes (grow 10% when >80% full, - // shrink 5% when <50% full, clamped to [2*pageSize, maxBatchSize]). - // This would improve memory limiter utilization for tables with small rows. - let batch_size = self.config.writer_batch_size as usize; + let batch_size = dynamic_target.unwrap_or(self.config.writer_batch_size as usize); let record_size = record.estimated_record_size(); let alloc_size = batch_size.max(record_size); let permit = self.memory_limiter.acquire(alloc_size)?; @@ -664,6 +664,8 @@ impl RecordAccumulator { let current_batch_size = batch.estimated_size_in_bytes(); size += current_batch_size; + self.record_actual_batch_size(table_path, current_batch_size); + // mark the batch as drained. batch.drained(current_time_ms()); ready.push(ReadyWriteBatch { @@ -690,6 +692,15 @@ impl RecordAccumulator { self.incomplete_batches.write().remove(&batch_id); } + fn record_actual_batch_size(&self, table_path: &Arc, actual: usize) { + if !self.config.writer_dynamic_batch_size_enabled { + return; + } + if let Some(entry) = self.write_batches.get(table_path) { + entry.dynamic_batch_size.lock().update(actual); + } + } + pub fn re_enqueue(&self, mut ready_write_batch: ReadyWriteBatch) { ready_write_batch.write_batch.re_enqueued(); @@ -785,12 +796,13 @@ impl RecordAccumulator { let mut binding = self .write_batches .entry(Arc::clone(physical_table_path)) - .or_insert_with(|| BucketAndWriteBatches { - table_id, - is_partitioned_table, - partition_id, - batches: Default::default(), - compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()), + .or_insert_with(|| { + BucketAndWriteBatches::new( + table_id, + is_partitioned_table, + partition_id, + &self.config, + ) }); let bucket_and_batches = binding.value_mut(); bucket_and_batches @@ -938,6 +950,29 @@ struct BucketAndWriteBatches { batches: HashMap>>>, /// Compression ratio estimator shared across Arrow log batches for this table. compression_ratio_estimator: Arc, + dynamic_batch_size: Mutex, +} + +impl BucketAndWriteBatches { + fn new( + table_id: TableId, + is_partitioned_table: bool, + partition_id: Option, + config: &Config, + ) -> Self { + let estimator = DynamicWriteBatchSizeEstimator::new( + config.writer_batch_size_min as usize, + config.writer_batch_size as usize, + ); + Self { + table_id, + is_partitioned_table, + partition_id, + batches: Default::default(), + compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()), + dynamic_batch_size: Mutex::new(estimator), + } + } } pub struct RecordAppendResult { @@ -1563,4 +1598,77 @@ mod tests { .await .expect("notified should complete after wakeup_sender"); } + + #[test] + fn dynamic_batch_size_shrinks_after_small_drained_batch() { + let target = 256 * 1024; + let config = Config { + writer_dynamic_batch_size_enabled: true, + writer_batch_size: target, + writer_batch_size_min: 4 * 1024, + writer_buffer_memory_size: 1024 * 1024, + ..Config::default() + }; + let accumulator = RecordAccumulator::new(config, disabled_idempotence()); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + let cluster = Arc::new(build_cluster(&table_path, 1, 1)); + let row = GenericRow { + values: vec![Datum::Int32(1)], + }; + let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); + + accumulator.append(&record, 0, &cluster, false).unwrap(); + assert_eq!(*accumulator.memory_limiter.state.lock(), target as usize); + + let server = cluster.get_tablet_server(1).expect("server"); + let nodes = HashSet::from([server.clone()]); + let mut drained = accumulator + .drain(cluster.clone(), &nodes, 1024 * 1024) + .unwrap(); + let mut batches = drained.remove(&1).expect("drained batches"); + let batch = batches.pop().expect("batch"); + accumulator.remove_incomplete_batches(batch.write_batch.batch_id()); + assert_eq!(*accumulator.memory_limiter.state.lock(), 0); + + accumulator.append(&record, 0, &cluster, false).unwrap(); + let second = *accumulator.memory_limiter.state.lock(); + assert!(second < target as usize, "{second} >= {target}"); + } + + #[test] + fn dynamic_batch_size_disabled_keeps_static_target() { + let target = 256 * 1024; + let config = Config { + writer_dynamic_batch_size_enabled: false, + writer_batch_size: target, + writer_batch_size_min: 4 * 1024, + writer_buffer_memory_size: 1024 * 1024, + ..Config::default() + }; + let accumulator = RecordAccumulator::new(config, disabled_idempotence()); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + let cluster = Arc::new(build_cluster(&table_path, 1, 1)); + let row = GenericRow { + values: vec![Datum::Int32(1)], + }; + let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); + + let server = cluster.get_tablet_server(1).expect("server"); + let nodes = HashSet::from([server.clone()]); + for _ in 0..3 { + accumulator.append(&record, 0, &cluster, false).unwrap(); + assert_eq!(*accumulator.memory_limiter.state.lock(), target as usize); + + let mut drained = accumulator + .drain(cluster.clone(), &nodes, 1024 * 1024) + .unwrap(); + let mut batches = drained.remove(&1).expect("drained batches"); + let batch = batches.pop().expect("batch"); + accumulator.remove_incomplete_batches(batch.write_batch.batch_id()); + } + } } diff --git a/crates/fluss/src/client/write/dynamic_batch_size.rs b/crates/fluss/src/client/write/dynamic_batch_size.rs new file mode 100644 index 00000000..0cf49658 --- /dev/null +++ b/crates/fluss/src/client/write/dynamic_batch_size.rs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-table batch size estimator. Mirrors Java's `DynamicWriteBatchSizeEstimator`: +//! grow 10% above 80% fill, shrink 5% below 50%, clamped to `[min, max]`. + +const GROW_THRESHOLD: f64 = 0.8; +const SHRINK_THRESHOLD: f64 = 0.5; +const GROW_FACTOR: f64 = 1.1; +const SHRINK_FACTOR: f64 = 0.95; + +#[derive(Debug, Clone)] +pub(crate) struct DynamicWriteBatchSizeEstimator { + current: usize, + min: usize, + max: usize, +} + +impl DynamicWriteBatchSizeEstimator { + pub fn new(min_size: usize, max_size: usize) -> Self { + Self { + current: max_size, + min: min_size.min(max_size), + max: max_size, + } + } + + pub fn current(&self) -> usize { + self.current + } + + pub fn update(&mut self, actual: usize) -> usize { + let cur = self.current as f64; + let actual = actual as f64; + let next = if actual >= cur * GROW_THRESHOLD { + cur * GROW_FACTOR + } else if actual <= cur * SHRINK_THRESHOLD { + cur * SHRINK_FACTOR + } else { + cur + }; + self.current = (next as usize).clamp(self.min, self.max); + self.current + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const MIN: usize = 256 * 1024; + const MAX: usize = 2 * 1024 * 1024; + /// ~41 shrink steps, ~22 grow steps; 50 covers both with margin. + const CONVERGENCE_STEPS: usize = 50; + + #[test] + fn starts_at_max() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + assert_eq!(est.current(), MAX); + } + + #[test] + fn min_clamped_to_max_when_misconfigured() { + let mut est = DynamicWriteBatchSizeEstimator::new(MAX * 2, MAX); + assert_eq!(est.current(), MAX); + assert_eq!(est.update(0), MAX); + } + + #[test] + fn grows_when_above_grow_threshold() { + let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + for _ in 0..CONVERGENCE_STEPS { + est.update(0); + } + assert_eq!(est.current(), MIN); + + // 0.9 sits safely past the 0.8 threshold and avoids f64 boundary noise. + let next = est.update((MIN as f64 * 0.9) as usize); + assert_eq!(next, ((MIN as f64) * GROW_FACTOR) as usize); + } + + #[test] + fn shrinks_when_below_shrink_threshold() { + let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let next = est.update((MAX as f64 * SHRINK_THRESHOLD) as usize); + assert_eq!(next, ((MAX as f64) * SHRINK_FACTOR) as usize); + } + + #[test] + fn shrink_clamps_to_min() { + let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + for _ in 0..CONVERGENCE_STEPS { + est.update(0); + } + assert_eq!(est.current(), MIN); + } + + #[test] + fn grow_clamps_to_max() { + let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + for _ in 0..CONVERGENCE_STEPS { + est.update(0); + } + for _ in 0..CONVERGENCE_STEPS { + est.update(est.current()); + } + assert_eq!(est.current(), MAX); + } + + #[test] + fn oversized_actual_clamps_at_max() { + let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + assert_eq!(est.update(MAX * 4), MAX); + } + + #[test] + fn dead_zone_is_a_fixed_point() { + let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let initial = est.current(); + for _ in 0..20 { + est.update((est.current() as f64 * 0.65) as usize); + } + assert_eq!(est.current(), initial); + } +} diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index bc324e18..a65b5d5a 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -17,6 +17,7 @@ mod accumulator; mod batch; +mod dynamic_batch_size; mod idempotence; use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, BroadcastOnceReceiver}; diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 09a17f83..f49afaa1 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -22,6 +22,9 @@ use strum_macros::{Display, EnumString}; const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123"; const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024; const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024; +// Mirrors Java's `2 * pageSize` floor with default pageSize = 128 KB. +const DEFAULT_WRITER_BATCH_SIZE_MIN: i32 = 256 * 1024; +const DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED: bool = false; const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; @@ -76,6 +79,16 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)] pub writer_batch_size: i32, + /// Tune the per-table writer batch size from observed fill ratios. + /// Default: false (matching Java `client.writer.dynamic-batch-size-enabled`). + #[arg(long, default_value_t = DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED)] + pub writer_dynamic_batch_size_enabled: bool, + + /// Lower bound for the dynamic batch size estimator. + /// Default: 262144 (256 KB), matching Java's `2 * pageSize` floor. + #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE_MIN)] + pub writer_batch_size_min: i32, + #[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)] pub writer_bucket_no_key_assigner: NoKeyAssigner, @@ -199,6 +212,11 @@ impl std::fmt::Debug for Config { .field("writer_acks", &self.writer_acks) .field("writer_retries", &self.writer_retries) .field("writer_batch_size", &self.writer_batch_size) + .field( + "writer_dynamic_batch_size_enabled", + &self.writer_dynamic_batch_size_enabled, + ) + .field("writer_batch_size_min", &self.writer_batch_size_min) .field( "writer_bucket_no_key_assigner", &self.writer_bucket_no_key_assigner, @@ -267,6 +285,8 @@ impl Default for Config { writer_acks: String::from(DEFAULT_ACKS), writer_retries: i32::MAX, writer_batch_size: DEFAULT_WRITER_BATCH_SIZE, + writer_dynamic_batch_size_enabled: DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED, + writer_batch_size_min: DEFAULT_WRITER_BATCH_SIZE_MIN, writer_bucket_no_key_assigner: NoKeyAssigner::Sticky, scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, @@ -388,6 +408,12 @@ impl Config { if self.writer_batch_size as usize > self.writer_buffer_memory_size { return Err("writer_batch_size must be <= writer_buffer_memory_size".to_string()); } + if self.writer_batch_size_min <= 0 { + return Err("writer_batch_size_min must be > 0".to_string()); + } + if self.writer_batch_size_min > self.writer_batch_size { + return Err("writer_batch_size_min must be <= writer_batch_size".to_string()); + } // idempotence checks if !self.writer_enable_idempotence { return Ok(()); diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index c50d40cd..157a18bb 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -21,7 +21,9 @@ Complete API reference for the Fluss C++ client. | `writer_request_max_size` | `int32_t` | `10485760` (10 MB) | Maximum request size in bytes | | `writer_acks` | `std::string` | `"all"` | Acknowledgment setting (`"all"`, `"0"`, `"1"`, or `"-1"`) | | `writer_retries` | `int32_t` | `INT32_MAX` | Number of retries on failure | -| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes | +| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes (also the upper bound when dynamic sizing is enabled) | +| `writer_dynamic_batch_size_enabled` | `bool` | `false` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50% | +| `writer_batch_size_min` | `int32_t` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when disabled) | | `writer_batch_timeout_ms` | `int64_t` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `std::string` | `"sticky"` | Bucket assignment strategy for tables without bucket keys: `"sticky"` or `"round_robin"` | | `scanner_remote_log_prefetch_num` | `size_t` | `4` | Number of remote log segments to prefetch | diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 32f23a59..2e4d4db5 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -14,7 +14,9 @@ Complete API reference for the Fluss Python client. | `writer_request_max_size` | `writer.request-max-size` | Get/set max request size in bytes | | `writer_acks` | `writer.acks` | Get/set acknowledgment setting (`"all"` for all replicas) | | `writer_retries` | `writer.retries` | Get/set number of retries on failure | -| `writer_batch_size` | `writer.batch-size` | Get/set write batch size in bytes | +| `writer_batch_size` | `writer.batch-size` | Get/set write batch size in bytes (also the upper bound when dynamic sizing is enabled) | +| `writer_dynamic_batch_size_enabled` | `writer.dynamic-batch-size-enabled` | Get/set whether the per-table dynamic batch size estimator is enabled (default `false`) | +| `writer_batch_size_min` | `writer.batch-size-min` | Get/set the lower bound for the dynamic batch size estimator (default 256 KB) | | `writer_batch_timeout_ms` | `writer.batch-timeout-ms` | Get/set max time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `writer.bucket.no-key-assigner` | Get/set bucket assignment strategy (`"sticky"` or `"round_robin"`) | | `scanner_remote_log_prefetch_num` | `scanner.remote-log.prefetch-num` | Get/set number of remote log segments to prefetch | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 9f2994ad..e19bfb62 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -13,7 +13,9 @@ Complete API reference for the Fluss Rust client. | `writer_request_max_size` | `i32` | `10485760` (10 MB) | Maximum request size in bytes | | `writer_acks` | `String` | `"all"` | Acknowledgment setting (`"all"` waits for all replicas) | | `writer_retries` | `i32` | `i32::MAX` | Number of retries on failure | -| `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch size for writes in bytes | +| `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch size for writes in bytes. Acts as the upper bound when dynamic batch sizing is enabled. | +| `writer_dynamic_batch_size_enabled` | `bool` | `false` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50%, clamped to `[writer_batch_size_min, writer_batch_size]` | +| `writer_batch_size_min` | `i32` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when `writer_dynamic_batch_size_enabled` is `false`) | | `writer_batch_timeout_ms` | `i64` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `NoKeyAssigner` | `sticky` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | | `scanner_remote_log_prefetch_num` | `usize` | `4` | Number of remote log segments to prefetch | From 61d54e09f94eebe5850d94ab1bd9a5697d9c94e5 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 7 May 2026 09:06:06 +0100 Subject: [PATCH 2/3] [rust][writer] Match Java defaults and key shape --- bindings/cpp/include/fluss.hpp | 2 +- bindings/python/src/config.rs | 2 +- crates/fluss/src/client/write/dynamic_batch_size.rs | 7 ++++--- crates/fluss/src/config.rs | 4 ++-- website/docs/user-guide/cpp/api-reference.md | 2 +- website/docs/user-guide/python/api-reference.md | 2 +- website/docs/user-guide/rust/api-reference.md | 2 +- 7 files changed, 11 insertions(+), 10 deletions(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 26cfcdf5..12af324b 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1226,7 +1226,7 @@ struct Configuration { // Writer batch size in bytes (2 MB), also the upper bound when dynamic sizing is on int32_t writer_batch_size{2 * 1024 * 1024}; // Tune the per-table writer batch size from observed fill ratios - bool writer_dynamic_batch_size_enabled{false}; + bool writer_dynamic_batch_size_enabled{true}; // Lower bound (256 KB) for the dynamic batch size estimator int32_t writer_batch_size_min{256 * 1024}; // Bucket assigner for tables without bucket keys: "sticky" or "round_robin" diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 1dc261e2..58166883 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -60,7 +60,7 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } - "writer.dynamic-batch-size-enabled" => { + "writer.dynamic-batch-size.enabled" => { config.writer_dynamic_batch_size_enabled = match value.as_str() { "true" => true, "false" => false, diff --git a/crates/fluss/src/client/write/dynamic_batch_size.rs b/crates/fluss/src/client/write/dynamic_batch_size.rs index 0cf49658..457e76e0 100644 --- a/crates/fluss/src/client/write/dynamic_batch_size.rs +++ b/crates/fluss/src/client/write/dynamic_batch_size.rs @@ -46,9 +46,9 @@ impl DynamicWriteBatchSizeEstimator { pub fn update(&mut self, actual: usize) -> usize { let cur = self.current as f64; let actual = actual as f64; - let next = if actual >= cur * GROW_THRESHOLD { + let next = if actual > cur * GROW_THRESHOLD { cur * GROW_FACTOR - } else if actual <= cur * SHRINK_THRESHOLD { + } else if actual < cur * SHRINK_THRESHOLD { cur * SHRINK_FACTOR } else { cur @@ -96,7 +96,8 @@ mod tests { #[test] fn shrinks_when_below_shrink_threshold() { let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); - let next = est.update((MAX as f64 * SHRINK_THRESHOLD) as usize); + // 0.4 sits safely below the strict 0.5 threshold. + let next = est.update((MAX as f64 * 0.4) as usize); assert_eq!(next, ((MAX as f64) * SHRINK_FACTOR) as usize); } diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index f49afaa1..8075e942 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -24,7 +24,7 @@ const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024; const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024; // Mirrors Java's `2 * pageSize` floor with default pageSize = 128 KB. const DEFAULT_WRITER_BATCH_SIZE_MIN: i32 = 256 * 1024; -const DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED: bool = false; +const DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED: bool = true; const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; @@ -80,7 +80,7 @@ pub struct Config { pub writer_batch_size: i32, /// Tune the per-table writer batch size from observed fill ratios. - /// Default: false (matching Java `client.writer.dynamic-batch-size-enabled`). + /// Default: true (matching Java `client.writer.dynamic-batch-size.enabled`). #[arg(long, default_value_t = DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED)] pub writer_dynamic_batch_size_enabled: bool, diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index 157a18bb..9298c934 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -22,7 +22,7 @@ Complete API reference for the Fluss C++ client. | `writer_acks` | `std::string` | `"all"` | Acknowledgment setting (`"all"`, `"0"`, `"1"`, or `"-1"`) | | `writer_retries` | `int32_t` | `INT32_MAX` | Number of retries on failure | | `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes (also the upper bound when dynamic sizing is enabled) | -| `writer_dynamic_batch_size_enabled` | `bool` | `false` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50% | +| `writer_dynamic_batch_size_enabled` | `bool` | `true` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50% | | `writer_batch_size_min` | `int32_t` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when disabled) | | `writer_batch_timeout_ms` | `int64_t` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `std::string` | `"sticky"` | Bucket assignment strategy for tables without bucket keys: `"sticky"` or `"round_robin"` | diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 2e4d4db5..b3da64be 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -15,7 +15,7 @@ Complete API reference for the Fluss Python client. | `writer_acks` | `writer.acks` | Get/set acknowledgment setting (`"all"` for all replicas) | | `writer_retries` | `writer.retries` | Get/set number of retries on failure | | `writer_batch_size` | `writer.batch-size` | Get/set write batch size in bytes (also the upper bound when dynamic sizing is enabled) | -| `writer_dynamic_batch_size_enabled` | `writer.dynamic-batch-size-enabled` | Get/set whether the per-table dynamic batch size estimator is enabled (default `false`) | +| `writer_dynamic_batch_size_enabled` | `writer.dynamic-batch-size.enabled` | Get/set whether the per-table dynamic batch size estimator is enabled (default `true`) | | `writer_batch_size_min` | `writer.batch-size-min` | Get/set the lower bound for the dynamic batch size estimator (default 256 KB) | | `writer_batch_timeout_ms` | `writer.batch-timeout-ms` | Get/set max time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `writer.bucket.no-key-assigner` | Get/set bucket assignment strategy (`"sticky"` or `"round_robin"`) | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index e19bfb62..94248861 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -14,7 +14,7 @@ Complete API reference for the Fluss Rust client. | `writer_acks` | `String` | `"all"` | Acknowledgment setting (`"all"` waits for all replicas) | | `writer_retries` | `i32` | `i32::MAX` | Number of retries on failure | | `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch size for writes in bytes. Acts as the upper bound when dynamic batch sizing is enabled. | -| `writer_dynamic_batch_size_enabled` | `bool` | `false` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50%, clamped to `[writer_batch_size_min, writer_batch_size]` | +| `writer_dynamic_batch_size_enabled` | `bool` | `true` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50%, clamped to `[writer_batch_size_min, writer_batch_size]` | | `writer_batch_size_min` | `i32` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when `writer_dynamic_batch_size_enabled` is `false`) | | `writer_batch_timeout_ms` | `i64` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `NoKeyAssigner` | `sticky` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | From 04f5d58b6694adce19def4487247d0dbf8f7bb91 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Fri, 8 May 2026 17:41:13 +0100 Subject: [PATCH 3/3] address feedback --- bindings/cpp/include/fluss.hpp | 2 +- bindings/cpp/src/ffi_converter.hpp | 2 +- bindings/cpp/src/lib.rs | 4 +- bindings/elixir/lib/fluss/config.ex | 10 +- .../elixir/native/fluss_nif/src/config.rs | 6 +- bindings/python/fluss/__init__.pyi | 6 +- bindings/python/src/config.rs | 19 +-- crates/fluss/src/client/write/accumulator.rs | 117 +++++++++++++++--- .../src/client/write/dynamic_batch_size.rs | 37 +++--- crates/fluss/src/config.rs | 22 ++-- website/docs/user-guide/cpp/api-reference.md | 4 +- .../docs/user-guide/python/api-reference.md | 4 +- website/docs/user-guide/rust/api-reference.md | 6 +- 13 files changed, 169 insertions(+), 70 deletions(-) diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 12af324b..616bb8b9 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1228,7 +1228,7 @@ struct Configuration { // Tune the per-table writer batch size from observed fill ratios bool writer_dynamic_batch_size_enabled{true}; // Lower bound (256 KB) for the dynamic batch size estimator - int32_t writer_batch_size_min{256 * 1024}; + int32_t writer_dynamic_batch_size_min{256 * 1024}; // Bucket assigner for tables without bucket keys: "sticky" or "round_robin" std::string writer_bucket_no_key_assigner{"sticky"}; // Number of remote log batches to prefetch during scanning diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 21e856b4..3e8e1fe6 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -119,7 +119,7 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.writer_retries = config.writer_retries; ffi_config.writer_batch_size = config.writer_batch_size; ffi_config.writer_dynamic_batch_size_enabled = config.writer_dynamic_batch_size_enabled; - ffi_config.writer_batch_size_min = config.writer_batch_size_min; + ffi_config.writer_dynamic_batch_size_min = config.writer_dynamic_batch_size_min; ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner); ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 19529788..0a71e901 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -47,7 +47,7 @@ mod ffi { writer_retries: i32, writer_batch_size: i32, writer_dynamic_batch_size_enabled: bool, - writer_batch_size_min: i32, + writer_dynamic_batch_size_min: i32, writer_bucket_no_key_assigner: String, scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, @@ -908,7 +908,7 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { writer_retries: config.writer_retries, writer_batch_size: config.writer_batch_size, writer_dynamic_batch_size_enabled: config.writer_dynamic_batch_size_enabled, - writer_batch_size_min: config.writer_batch_size_min, + writer_dynamic_batch_size_min: config.writer_dynamic_batch_size_min, writer_batch_timeout_ms: config.writer_batch_timeout_ms, writer_bucket_no_key_assigner: assigner_type, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, diff --git a/bindings/elixir/lib/fluss/config.ex b/bindings/elixir/lib/fluss/config.ex index 35781258..f12f61e4 100644 --- a/bindings/elixir/lib/fluss/config.ex +++ b/bindings/elixir/lib/fluss/config.ex @@ -36,14 +36,14 @@ defmodule Fluss.Config do writer_batch_size: nil, writer_batch_timeout_ms: nil, writer_dynamic_batch_size_enabled: nil, - writer_batch_size_min: nil + writer_dynamic_batch_size_min: nil @type t :: %__MODULE__{ bootstrap_servers: String.t(), writer_batch_size: non_neg_integer() | nil, writer_batch_timeout_ms: non_neg_integer() | nil, writer_dynamic_batch_size_enabled: boolean() | nil, - writer_batch_size_min: non_neg_integer() | nil + writer_dynamic_batch_size_min: non_neg_integer() | nil } @spec new(String.t()) :: t() @@ -71,9 +71,9 @@ defmodule Fluss.Config do when is_boolean(enabled), do: %{config | writer_dynamic_batch_size_enabled: enabled} - @spec set_writer_batch_size_min(t(), non_neg_integer()) :: t() - def set_writer_batch_size_min(%__MODULE__{} = config, size) when is_integer(size), - do: %{config | writer_batch_size_min: size} + @spec set_writer_dynamic_batch_size_min(t(), non_neg_integer()) :: t() + def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) when is_integer(size), + do: %{config | writer_dynamic_batch_size_min: size} @spec get_bootstrap_servers(t()) :: String.t() def get_bootstrap_servers(%__MODULE__{bootstrap_servers: servers}), do: servers diff --git a/bindings/elixir/native/fluss_nif/src/config.rs b/bindings/elixir/native/fluss_nif/src/config.rs index c16a57eb..8bbdfad9 100644 --- a/bindings/elixir/native/fluss_nif/src/config.rs +++ b/bindings/elixir/native/fluss_nif/src/config.rs @@ -26,7 +26,7 @@ pub struct NifConfig { pub writer_batch_size: Option, pub writer_batch_timeout_ms: Option, pub writer_dynamic_batch_size_enabled: Option, - pub writer_batch_size_min: Option, + pub writer_dynamic_batch_size_min: Option, } impl NifConfig { @@ -44,8 +44,8 @@ impl NifConfig { if let Some(enabled) = self.writer_dynamic_batch_size_enabled { config.writer_dynamic_batch_size_enabled = enabled; } - if let Some(size) = self.writer_batch_size_min { - config.writer_batch_size_min = size; + if let Some(size) = self.writer_dynamic_batch_size_min { + config.writer_dynamic_batch_size_min = size; } config } diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 594494b2..5a59ea51 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -168,9 +168,9 @@ class Config: @writer_dynamic_batch_size_enabled.setter def writer_dynamic_batch_size_enabled(self, enabled: bool) -> None: ... @property - def writer_batch_size_min(self) -> int: ... - @writer_batch_size_min.setter - def writer_batch_size_min(self, size: int) -> None: ... + def writer_dynamic_batch_size_min(self) -> int: ... + @writer_dynamic_batch_size_min.setter + def writer_dynamic_batch_size_min(self, size: int) -> None: ... @property def writer_bucket_no_key_assigner(self) -> str: ... @writer_bucket_no_key_assigner.setter diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 58166883..11188bf3 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -71,10 +71,13 @@ impl Config { } }; } - "writer.batch-size-min" => { - config.writer_batch_size_min = value.parse::().map_err(|e| { - FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) - })?; + "writer.dynamic-batch-size-min" => { + config.writer_dynamic_batch_size_min = + value.parse::().map_err(|e| { + FlussError::new_err(format!( + "Invalid value '{value}' for '{key}': {e}" + )) + })?; } "writer.batch-timeout-ms" => { config.writer_batch_timeout_ms = value.parse::().map_err(|e| { @@ -280,14 +283,14 @@ impl Config { /// Get the lower bound used by the dynamic batch size estimator #[getter] - fn writer_batch_size_min(&self) -> i32 { - self.inner.writer_batch_size_min + fn writer_dynamic_batch_size_min(&self) -> i32 { + self.inner.writer_dynamic_batch_size_min } /// Set the lower bound used by the dynamic batch size estimator #[setter] - fn set_writer_batch_size_min(&mut self, size: i32) { - self.inner.writer_batch_size_min = size; + fn set_writer_dynamic_batch_size_min(&mut self, size: i32) { + self.inner.writer_dynamic_batch_size_min = size; } /// Get the scanner remote log prefetch num diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 1567aeaf..244edf73 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -326,10 +326,10 @@ impl RecordAccumulator { .entry(bucket_id) .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) .clone(); - let dynamic_target = self - .config - .writer_dynamic_batch_size_enabled - .then(|| bucket_and_batches.dynamic_batch_size.lock().current()); + let dynamic_target = bucket_and_batches + .dynamic_batch_size + .as_ref() + .map(|est| est.current()); ( dq, Arc::clone(&bucket_and_batches.compression_ratio_estimator), @@ -693,14 +693,33 @@ impl RecordAccumulator { } fn record_actual_batch_size(&self, table_path: &Arc, actual: usize) { - if !self.config.writer_dynamic_batch_size_enabled { + let Some(entry) = self.write_batches.get(table_path) else { return; - } - if let Some(entry) = self.write_batches.get(table_path) { - entry.dynamic_batch_size.lock().update(actual); + }; + let Some(estimator) = entry.dynamic_batch_size.as_ref() else { + return; + }; + let prev = estimator.current(); + let next = estimator.update(actual); + if next != prev { + log::debug!( + "Set estimated batch size for {} from {} to {}", + table_path.as_ref(), + prev, + next + ); } } + #[cfg(test)] + fn estimated_batch_size(&self, table_path: &Arc) -> Option { + self.write_batches + .get(table_path)? + .dynamic_batch_size + .as_ref() + .map(|est| est.current()) + } + pub fn re_enqueue(&self, mut ready_write_batch: ReadyWriteBatch) { ready_write_batch.write_batch.re_enqueued(); @@ -950,7 +969,8 @@ struct BucketAndWriteBatches { batches: HashMap>>>, /// Compression ratio estimator shared across Arrow log batches for this table. compression_ratio_estimator: Arc, - dynamic_batch_size: Mutex, + /// `None` when `writer_dynamic_batch_size_enabled` is false. + dynamic_batch_size: Option, } impl BucketAndWriteBatches { @@ -960,17 +980,19 @@ impl BucketAndWriteBatches { partition_id: Option, config: &Config, ) -> Self { - let estimator = DynamicWriteBatchSizeEstimator::new( - config.writer_batch_size_min as usize, - config.writer_batch_size as usize, - ); + let dynamic_batch_size = config.writer_dynamic_batch_size_enabled.then(|| { + DynamicWriteBatchSizeEstimator::new( + config.writer_dynamic_batch_size_min as usize, + config.writer_batch_size as usize, + ) + }); Self { table_id, is_partitioned_table, partition_id, batches: Default::default(), compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()), - dynamic_batch_size: Mutex::new(estimator), + dynamic_batch_size, } } } @@ -1034,9 +1056,12 @@ impl ReadyCheckResult { #[cfg(test)] mod tests { use super::*; + use crate::client::write::write_format::WriteFormat; + use crate::client::write::{RowBytes, WriteRecord}; use crate::metadata::TablePath; use crate::row::{Datum, GenericRow}; use crate::test_utils::{build_cluster, build_table_info}; + use bytes::Bytes; use std::sync::Arc; fn disabled_idempotence() -> Arc { @@ -1605,7 +1630,7 @@ mod tests { let config = Config { writer_dynamic_batch_size_enabled: true, writer_batch_size: target, - writer_batch_size_min: 4 * 1024, + writer_dynamic_batch_size_min: 4 * 1024, writer_buffer_memory_size: 1024 * 1024, ..Config::default() }; @@ -1637,13 +1662,73 @@ mod tests { assert!(second < target as usize, "{second} >= {target}"); } + #[test] + fn dynamic_batch_size_grows_after_full_drained_batch() { + let max = 256 * 1024; + let config = Config { + writer_dynamic_batch_size_enabled: true, + writer_batch_size: max, + writer_dynamic_batch_size_min: 4 * 1024, + writer_buffer_memory_size: 4 * 1024 * 1024, + ..Config::default() + }; + let accumulator = RecordAccumulator::new(config, disabled_idempotence()); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + let cluster = Arc::new(build_cluster(&table_path, 1, 1)); + let nodes = HashSet::from([cluster.get_tablet_server(1).unwrap().clone()]); + + let kv = |size: usize| { + WriteRecord::for_upsert( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + Bytes::from(vec![0u8; 32]), + None, + WriteFormat::CompactedKv, + None, + Some(RowBytes::Owned(Bytes::from(vec![0u8; size]))), + ) + }; + let drain_one = || { + let mut d = accumulator.drain(cluster.clone(), &nodes, max).unwrap(); + let b = d.remove(&1).unwrap().pop().unwrap(); + accumulator.remove_incomplete_batches(b.write_batch.batch_id()); + }; + let target = || { + accumulator + .estimated_batch_size(&physical_table_path) + .unwrap() + }; + + accumulator.append(&kv(1), 0, &cluster, false).unwrap(); + drain_one(); + let after_shrink = target(); + assert!( + after_shrink < max as usize, + "shrink failed: after_shrink={after_shrink} max={max}" + ); + + // 0.9 sits safely above GROW_THRESHOLD (0.8) to avoid f64 boundary noise. + accumulator + .append(&kv(after_shrink * 9 / 10), 0, &cluster, false) + .unwrap(); + drain_one(); + let after_grow = target(); + assert!( + after_grow > after_shrink, + "grow failed: after_grow={after_grow} after_shrink={after_shrink}" + ); + } + #[test] fn dynamic_batch_size_disabled_keeps_static_target() { let target = 256 * 1024; let config = Config { writer_dynamic_batch_size_enabled: false, writer_batch_size: target, - writer_batch_size_min: 4 * 1024, + writer_dynamic_batch_size_min: 4 * 1024, writer_buffer_memory_size: 1024 * 1024, ..Config::default() }; diff --git a/crates/fluss/src/client/write/dynamic_batch_size.rs b/crates/fluss/src/client/write/dynamic_batch_size.rs index 457e76e0..408263ee 100644 --- a/crates/fluss/src/client/write/dynamic_batch_size.rs +++ b/crates/fluss/src/client/write/dynamic_batch_size.rs @@ -18,14 +18,16 @@ //! Per-table batch size estimator. Mirrors Java's `DynamicWriteBatchSizeEstimator`: //! grow 10% above 80% fill, shrink 5% below 50%, clamped to `[min, max]`. +use std::sync::atomic::{AtomicUsize, Ordering}; + const GROW_THRESHOLD: f64 = 0.8; const SHRINK_THRESHOLD: f64 = 0.5; const GROW_FACTOR: f64 = 1.1; const SHRINK_FACTOR: f64 = 0.95; -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct DynamicWriteBatchSizeEstimator { - current: usize, + current: AtomicUsize, min: usize, max: usize, } @@ -33,18 +35,20 @@ pub(crate) struct DynamicWriteBatchSizeEstimator { impl DynamicWriteBatchSizeEstimator { pub fn new(min_size: usize, max_size: usize) -> Self { Self { - current: max_size, + current: AtomicUsize::new(max_size), min: min_size.min(max_size), max: max_size, } } pub fn current(&self) -> usize { - self.current + self.current.load(Ordering::Relaxed) } - pub fn update(&mut self, actual: usize) -> usize { - let cur = self.current as f64; + /// Last-write-wins on races, matching Java's `ConcurrentHashMap.put`. + pub fn update(&self, actual: usize) -> usize { + let prev = self.current.load(Ordering::Relaxed); + let cur = prev as f64; let actual = actual as f64; let next = if actual > cur * GROW_THRESHOLD { cur * GROW_FACTOR @@ -53,8 +57,11 @@ impl DynamicWriteBatchSizeEstimator { } else { cur }; - self.current = (next as usize).clamp(self.min, self.max); - self.current + let clamped = (next as usize).clamp(self.min, self.max); + if clamped != prev { + self.current.store(clamped, Ordering::Relaxed); + } + clamped } } @@ -75,14 +82,14 @@ mod tests { #[test] fn min_clamped_to_max_when_misconfigured() { - let mut est = DynamicWriteBatchSizeEstimator::new(MAX * 2, MAX); + let est = DynamicWriteBatchSizeEstimator::new(MAX * 2, MAX); assert_eq!(est.current(), MAX); assert_eq!(est.update(0), MAX); } #[test] fn grows_when_above_grow_threshold() { - let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); for _ in 0..CONVERGENCE_STEPS { est.update(0); } @@ -95,7 +102,7 @@ mod tests { #[test] fn shrinks_when_below_shrink_threshold() { - let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); // 0.4 sits safely below the strict 0.5 threshold. let next = est.update((MAX as f64 * 0.4) as usize); assert_eq!(next, ((MAX as f64) * SHRINK_FACTOR) as usize); @@ -103,7 +110,7 @@ mod tests { #[test] fn shrink_clamps_to_min() { - let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); for _ in 0..CONVERGENCE_STEPS { est.update(0); } @@ -112,7 +119,7 @@ mod tests { #[test] fn grow_clamps_to_max() { - let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); for _ in 0..CONVERGENCE_STEPS { est.update(0); } @@ -124,13 +131,13 @@ mod tests { #[test] fn oversized_actual_clamps_at_max() { - let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); assert_eq!(est.update(MAX * 4), MAX); } #[test] fn dead_zone_is_a_fixed_point() { - let mut est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); let initial = est.current(); for _ in 0..20 { est.update((est.current() as f64 * 0.65) as usize); diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 8075e942..cad8d9cb 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -23,7 +23,7 @@ const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123"; const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024; const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024; // Mirrors Java's `2 * pageSize` floor with default pageSize = 128 KB. -const DEFAULT_WRITER_BATCH_SIZE_MIN: i32 = 256 * 1024; +const DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_MIN: i32 = 256 * 1024; const DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED: bool = true; const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; @@ -86,8 +86,9 @@ pub struct Config { /// Lower bound for the dynamic batch size estimator. /// Default: 262144 (256 KB), matching Java's `2 * pageSize` floor. - #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE_MIN)] - pub writer_batch_size_min: i32, + /// Ignored when `writer_dynamic_batch_size_enabled` is false. + #[arg(long, default_value_t = DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_MIN)] + pub writer_dynamic_batch_size_min: i32, #[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)] pub writer_bucket_no_key_assigner: NoKeyAssigner, @@ -216,7 +217,10 @@ impl std::fmt::Debug for Config { "writer_dynamic_batch_size_enabled", &self.writer_dynamic_batch_size_enabled, ) - .field("writer_batch_size_min", &self.writer_batch_size_min) + .field( + "writer_dynamic_batch_size_min", + &self.writer_dynamic_batch_size_min, + ) .field( "writer_bucket_no_key_assigner", &self.writer_bucket_no_key_assigner, @@ -286,7 +290,7 @@ impl Default for Config { writer_retries: i32::MAX, writer_batch_size: DEFAULT_WRITER_BATCH_SIZE, writer_dynamic_batch_size_enabled: DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED, - writer_batch_size_min: DEFAULT_WRITER_BATCH_SIZE_MIN, + writer_dynamic_batch_size_min: DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_MIN, writer_bucket_no_key_assigner: NoKeyAssigner::Sticky, scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, @@ -408,11 +412,11 @@ impl Config { if self.writer_batch_size as usize > self.writer_buffer_memory_size { return Err("writer_batch_size must be <= writer_buffer_memory_size".to_string()); } - if self.writer_batch_size_min <= 0 { - return Err("writer_batch_size_min must be > 0".to_string()); + if self.writer_dynamic_batch_size_min <= 0 { + return Err("writer_dynamic_batch_size_min must be > 0".to_string()); } - if self.writer_batch_size_min > self.writer_batch_size { - return Err("writer_batch_size_min must be <= writer_batch_size".to_string()); + if self.writer_dynamic_batch_size_min > self.writer_batch_size { + return Err("writer_dynamic_batch_size_min must be <= writer_batch_size".to_string()); } // idempotence checks if !self.writer_enable_idempotence { diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index 9298c934..d904a064 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -21,9 +21,9 @@ Complete API reference for the Fluss C++ client. | `writer_request_max_size` | `int32_t` | `10485760` (10 MB) | Maximum request size in bytes | | `writer_acks` | `std::string` | `"all"` | Acknowledgment setting (`"all"`, `"0"`, `"1"`, or `"-1"`) | | `writer_retries` | `int32_t` | `INT32_MAX` | Number of retries on failure | -| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes (also the upper bound when dynamic sizing is enabled) | +| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes. Upper bound when dynamic sizing is on; fixed batch size when off | | `writer_dynamic_batch_size_enabled` | `bool` | `true` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50% | -| `writer_batch_size_min` | `int32_t` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when disabled) | +| `writer_dynamic_batch_size_min` | `int32_t` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when disabled) | | `writer_batch_timeout_ms` | `int64_t` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `std::string` | `"sticky"` | Bucket assignment strategy for tables without bucket keys: `"sticky"` or `"round_robin"` | | `scanner_remote_log_prefetch_num` | `size_t` | `4` | Number of remote log segments to prefetch | diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index b3da64be..aec4412d 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -14,9 +14,9 @@ Complete API reference for the Fluss Python client. | `writer_request_max_size` | `writer.request-max-size` | Get/set max request size in bytes | | `writer_acks` | `writer.acks` | Get/set acknowledgment setting (`"all"` for all replicas) | | `writer_retries` | `writer.retries` | Get/set number of retries on failure | -| `writer_batch_size` | `writer.batch-size` | Get/set write batch size in bytes (also the upper bound when dynamic sizing is enabled) | +| `writer_batch_size` | `writer.batch-size` | Get/set write batch size in bytes. Upper bound when dynamic sizing is on; fixed batch size when off | | `writer_dynamic_batch_size_enabled` | `writer.dynamic-batch-size.enabled` | Get/set whether the per-table dynamic batch size estimator is enabled (default `true`) | -| `writer_batch_size_min` | `writer.batch-size-min` | Get/set the lower bound for the dynamic batch size estimator (default 256 KB) | +| `writer_dynamic_batch_size_min` | `writer.dynamic-batch-size-min` | Get/set the lower bound for the dynamic batch size estimator (default 256 KB; ignored when disabled) | | `writer_batch_timeout_ms` | `writer.batch-timeout-ms` | Get/set max time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `writer.bucket.no-key-assigner` | Get/set bucket assignment strategy (`"sticky"` or `"round_robin"`) | | `scanner_remote_log_prefetch_num` | `scanner.remote-log.prefetch-num` | Get/set number of remote log segments to prefetch | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 94248861..03054f0f 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -13,9 +13,9 @@ Complete API reference for the Fluss Rust client. | `writer_request_max_size` | `i32` | `10485760` (10 MB) | Maximum request size in bytes | | `writer_acks` | `String` | `"all"` | Acknowledgment setting (`"all"` waits for all replicas) | | `writer_retries` | `i32` | `i32::MAX` | Number of retries on failure | -| `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch size for writes in bytes. Acts as the upper bound when dynamic batch sizing is enabled. | -| `writer_dynamic_batch_size_enabled` | `bool` | `true` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50%, clamped to `[writer_batch_size_min, writer_batch_size]` | -| `writer_batch_size_min` | `i32` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when `writer_dynamic_batch_size_enabled` is `false`) | +| `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch size for writes in bytes. Upper bound when dynamic sizing is on; fixed batch size when off. | +| `writer_dynamic_batch_size_enabled` | `bool` | `true` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50%, clamped to `[writer_dynamic_batch_size_min, writer_batch_size]` | +| `writer_dynamic_batch_size_min` | `i32` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when `writer_dynamic_batch_size_enabled` is `false`) | | `writer_batch_timeout_ms` | `i64` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `NoKeyAssigner` | `sticky` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | | `scanner_remote_log_prefetch_num` | `usize` | `4` | Number of remote log segments to prefetch |