diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index f222166d..616bb8b9 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1223,8 +1223,12 @@ struct Configuration { std::string writer_acks{"all"}; // Max number of writer retries int32_t writer_retries{std::numeric_limits::max()}; - // Writer batch size in bytes (2 MB) + // Writer batch size in bytes (2 MB), also the upper bound when dynamic sizing is on int32_t writer_batch_size{2 * 1024 * 1024}; + // Tune the per-table writer batch size from observed fill ratios + bool writer_dynamic_batch_size_enabled{true}; + // Lower bound (256 KB) for the dynamic batch size estimator + int32_t writer_dynamic_batch_size_min{256 * 1024}; // Bucket assigner for tables without bucket keys: "sticky" or "round_robin" std::string writer_bucket_no_key_assigner{"sticky"}; // Number of remote log batches to prefetch during scanning diff --git a/bindings/cpp/src/ffi_converter.hpp b/bindings/cpp/src/ffi_converter.hpp index 0ef1487a..3e8e1fe6 100644 --- a/bindings/cpp/src/ffi_converter.hpp +++ b/bindings/cpp/src/ffi_converter.hpp @@ -118,6 +118,8 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) { ffi_config.writer_acks = rust::String(config.writer_acks); ffi_config.writer_retries = config.writer_retries; ffi_config.writer_batch_size = config.writer_batch_size; + ffi_config.writer_dynamic_batch_size_enabled = config.writer_dynamic_batch_size_enabled; + ffi_config.writer_dynamic_batch_size_min = config.writer_dynamic_batch_size_min; ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner); ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num; ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num; diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index 875373bc..0a71e901 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -46,6 +46,8 @@ mod ffi { writer_acks: String, writer_retries: i32, writer_batch_size: i32, + writer_dynamic_batch_size_enabled: bool, + writer_dynamic_batch_size_min: i32, writer_bucket_no_key_assigner: String, scanner_remote_log_prefetch_num: usize, remote_file_download_thread_num: usize, @@ -905,6 +907,8 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult { writer_acks: config.writer_acks.to_string(), writer_retries: config.writer_retries, writer_batch_size: config.writer_batch_size, + writer_dynamic_batch_size_enabled: config.writer_dynamic_batch_size_enabled, + writer_dynamic_batch_size_min: config.writer_dynamic_batch_size_min, writer_batch_timeout_ms: config.writer_batch_timeout_ms, writer_bucket_no_key_assigner: assigner_type, scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num, diff --git a/bindings/elixir/lib/fluss/config.ex b/bindings/elixir/lib/fluss/config.ex index 1120a5f4..f12f61e4 100644 --- a/bindings/elixir/lib/fluss/config.ex +++ b/bindings/elixir/lib/fluss/config.ex @@ -34,12 +34,16 @@ defmodule Fluss.Config do @enforce_keys [:bootstrap_servers] defstruct bootstrap_servers: nil, writer_batch_size: nil, - writer_batch_timeout_ms: nil + writer_batch_timeout_ms: nil, + writer_dynamic_batch_size_enabled: nil, + writer_dynamic_batch_size_min: nil @type t :: %__MODULE__{ bootstrap_servers: String.t(), writer_batch_size: non_neg_integer() | nil, - writer_batch_timeout_ms: non_neg_integer() | nil + writer_batch_timeout_ms: non_neg_integer() | nil, + writer_dynamic_batch_size_enabled: boolean() | nil, + writer_dynamic_batch_size_min: non_neg_integer() | nil } @spec new(String.t()) :: t() @@ -62,6 +66,15 @@ defmodule Fluss.Config do def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms), do: %{config | writer_batch_timeout_ms: ms} + @spec set_writer_dynamic_batch_size_enabled(t(), boolean()) :: t() + def set_writer_dynamic_batch_size_enabled(%__MODULE__{} = config, enabled) + when is_boolean(enabled), + do: %{config | writer_dynamic_batch_size_enabled: enabled} + + @spec set_writer_dynamic_batch_size_min(t(), non_neg_integer()) :: t() + def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) when is_integer(size), + do: %{config | writer_dynamic_batch_size_min: size} + @spec get_bootstrap_servers(t()) :: String.t() def get_bootstrap_servers(%__MODULE__{bootstrap_servers: servers}), do: servers end diff --git a/bindings/elixir/native/fluss_nif/src/config.rs b/bindings/elixir/native/fluss_nif/src/config.rs index 536ed68f..8bbdfad9 100644 --- a/bindings/elixir/native/fluss_nif/src/config.rs +++ b/bindings/elixir/native/fluss_nif/src/config.rs @@ -25,6 +25,8 @@ pub struct NifConfig { pub bootstrap_servers: String, pub writer_batch_size: Option, pub writer_batch_timeout_ms: Option, + pub writer_dynamic_batch_size_enabled: Option, + pub writer_dynamic_batch_size_min: Option, } impl NifConfig { @@ -39,6 +41,12 @@ impl NifConfig { if let Some(ms) = self.writer_batch_timeout_ms { config.writer_batch_timeout_ms = ms; } + if let Some(enabled) = self.writer_dynamic_batch_size_enabled { + config.writer_dynamic_batch_size_enabled = enabled; + } + if let Some(size) = self.writer_dynamic_batch_size_min { + config.writer_dynamic_batch_size_min = size; + } config } } diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index fc713973..5a59ea51 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -164,6 +164,14 @@ class Config: @writer_batch_size.setter def writer_batch_size(self, size: int) -> None: ... @property + def writer_dynamic_batch_size_enabled(self) -> bool: ... + @writer_dynamic_batch_size_enabled.setter + def writer_dynamic_batch_size_enabled(self, enabled: bool) -> None: ... + @property + def writer_dynamic_batch_size_min(self) -> int: ... + @writer_dynamic_batch_size_min.setter + def writer_dynamic_batch_size_min(self, size: int) -> None: ... + @property def writer_bucket_no_key_assigner(self) -> str: ... @writer_bucket_no_key_assigner.setter def writer_bucket_no_key_assigner(self, value: str) -> None: ... diff --git a/bindings/python/src/config.rs b/bindings/python/src/config.rs index 65bcc9ad..11188bf3 100644 --- a/bindings/python/src/config.rs +++ b/bindings/python/src/config.rs @@ -60,6 +60,25 @@ impl Config { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) })?; } + "writer.dynamic-batch-size.enabled" => { + config.writer_dynamic_batch_size_enabled = match value.as_str() { + "true" => true, + "false" => false, + other => { + return Err(FlussError::new_err(format!( + "Invalid value '{other}' for '{key}', expected 'true' or 'false'" + ))); + } + }; + } + "writer.dynamic-batch-size-min" => { + config.writer_dynamic_batch_size_min = + value.parse::().map_err(|e| { + FlussError::new_err(format!( + "Invalid value '{value}' for '{key}': {e}" + )) + })?; + } "writer.batch-timeout-ms" => { config.writer_batch_timeout_ms = value.parse::().map_err(|e| { FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}")) @@ -250,6 +269,30 @@ impl Config { self.inner.writer_batch_size = size; } + /// Get whether the per-table dynamic batch size estimator is enabled + #[getter] + fn writer_dynamic_batch_size_enabled(&self) -> bool { + self.inner.writer_dynamic_batch_size_enabled + } + + /// Set whether the per-table dynamic batch size estimator is enabled + #[setter] + fn set_writer_dynamic_batch_size_enabled(&mut self, enabled: bool) { + self.inner.writer_dynamic_batch_size_enabled = enabled; + } + + /// Get the lower bound used by the dynamic batch size estimator + #[getter] + fn writer_dynamic_batch_size_min(&self) -> i32 { + self.inner.writer_dynamic_batch_size_min + } + + /// Set the lower bound used by the dynamic batch size estimator + #[setter] + fn set_writer_dynamic_batch_size_min(&mut self, size: i32) { + self.inner.writer_dynamic_batch_size_min = size; + } + /// Get the scanner remote log prefetch num #[getter] fn scanner_remote_log_prefetch_num(&self) -> usize { diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 019d3b05..244edf73 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -19,6 +19,7 @@ use crate::client::broadcast; use crate::client::write::IdempotenceManager; use crate::client::write::batch::WriteBatch::{ArrowLog, Kv}; use crate::client::write::batch::{ArrowLogWriteBatch, KvWriteBatch, WriteBatch}; +use crate::client::write::dynamic_batch_size::DynamicWriteBatchSizeEstimator; use crate::client::{LogWriteRecord, Record, ResultHandle, WriteRecord}; use crate::cluster::{BucketLocation, Cluster, ServerNode}; use crate::compression::ArrowCompressionRatioEstimator; @@ -307,28 +308,32 @@ impl RecordAccumulator { None }; - let (dq, compression_ratio_estimator) = { - let mut binding = - self.write_batches - .entry(Arc::clone(physical_table_path)) - .or_insert_with(|| BucketAndWriteBatches { - table_id: table_info.table_id, + let (dq, compression_ratio_estimator, dynamic_target) = { + let mut binding = self + .write_batches + .entry(Arc::clone(physical_table_path)) + .or_insert_with(|| { + BucketAndWriteBatches::new( + table_info.table_id, is_partitioned_table, partition_id, - batches: Default::default(), - compression_ratio_estimator: Arc::new( - ArrowCompressionRatioEstimator::default(), - ), - }); + &self.config, + ) + }); let bucket_and_batches = binding.value_mut(); let dq = bucket_and_batches .batches .entry(bucket_id) .or_insert_with(|| Arc::new(Mutex::new(VecDeque::new()))) .clone(); + let dynamic_target = bucket_and_batches + .dynamic_batch_size + .as_ref() + .map(|est| est.current()); ( dq, Arc::clone(&bucket_and_batches.compression_ratio_estimator), + dynamic_target, ) }; @@ -347,12 +352,7 @@ impl RecordAccumulator { // producer holds dq + blocks on memory, while sender needs dq to drain. drop(dq_guard); - // TODO: Implement DynamicWriteBatchSizeEstimator matching Java's - // client.writer.dynamic-batch-size-enabled. Adjusts the batch size target - // per table based on observed actual batch sizes (grow 10% when >80% full, - // shrink 5% when <50% full, clamped to [2*pageSize, maxBatchSize]). - // This would improve memory limiter utilization for tables with small rows. - let batch_size = self.config.writer_batch_size as usize; + let batch_size = dynamic_target.unwrap_or(self.config.writer_batch_size as usize); let record_size = record.estimated_record_size(); let alloc_size = batch_size.max(record_size); let permit = self.memory_limiter.acquire(alloc_size)?; @@ -664,6 +664,8 @@ impl RecordAccumulator { let current_batch_size = batch.estimated_size_in_bytes(); size += current_batch_size; + self.record_actual_batch_size(table_path, current_batch_size); + // mark the batch as drained. batch.drained(current_time_ms()); ready.push(ReadyWriteBatch { @@ -690,6 +692,34 @@ impl RecordAccumulator { self.incomplete_batches.write().remove(&batch_id); } + fn record_actual_batch_size(&self, table_path: &Arc, actual: usize) { + let Some(entry) = self.write_batches.get(table_path) else { + return; + }; + let Some(estimator) = entry.dynamic_batch_size.as_ref() else { + return; + }; + let prev = estimator.current(); + let next = estimator.update(actual); + if next != prev { + log::debug!( + "Set estimated batch size for {} from {} to {}", + table_path.as_ref(), + prev, + next + ); + } + } + + #[cfg(test)] + fn estimated_batch_size(&self, table_path: &Arc) -> Option { + self.write_batches + .get(table_path)? + .dynamic_batch_size + .as_ref() + .map(|est| est.current()) + } + pub fn re_enqueue(&self, mut ready_write_batch: ReadyWriteBatch) { ready_write_batch.write_batch.re_enqueued(); @@ -785,12 +815,13 @@ impl RecordAccumulator { let mut binding = self .write_batches .entry(Arc::clone(physical_table_path)) - .or_insert_with(|| BucketAndWriteBatches { - table_id, - is_partitioned_table, - partition_id, - batches: Default::default(), - compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()), + .or_insert_with(|| { + BucketAndWriteBatches::new( + table_id, + is_partitioned_table, + partition_id, + &self.config, + ) }); let bucket_and_batches = binding.value_mut(); bucket_and_batches @@ -938,6 +969,32 @@ struct BucketAndWriteBatches { batches: HashMap>>>, /// Compression ratio estimator shared across Arrow log batches for this table. compression_ratio_estimator: Arc, + /// `None` when `writer_dynamic_batch_size_enabled` is false. + dynamic_batch_size: Option, +} + +impl BucketAndWriteBatches { + fn new( + table_id: TableId, + is_partitioned_table: bool, + partition_id: Option, + config: &Config, + ) -> Self { + let dynamic_batch_size = config.writer_dynamic_batch_size_enabled.then(|| { + DynamicWriteBatchSizeEstimator::new( + config.writer_dynamic_batch_size_min as usize, + config.writer_batch_size as usize, + ) + }); + Self { + table_id, + is_partitioned_table, + partition_id, + batches: Default::default(), + compression_ratio_estimator: Arc::new(ArrowCompressionRatioEstimator::default()), + dynamic_batch_size, + } + } } pub struct RecordAppendResult { @@ -999,9 +1056,12 @@ impl ReadyCheckResult { #[cfg(test)] mod tests { use super::*; + use crate::client::write::write_format::WriteFormat; + use crate::client::write::{RowBytes, WriteRecord}; use crate::metadata::TablePath; use crate::row::{Datum, GenericRow}; use crate::test_utils::{build_cluster, build_table_info}; + use bytes::Bytes; use std::sync::Arc; fn disabled_idempotence() -> Arc { @@ -1563,4 +1623,137 @@ mod tests { .await .expect("notified should complete after wakeup_sender"); } + + #[test] + fn dynamic_batch_size_shrinks_after_small_drained_batch() { + let target = 256 * 1024; + let config = Config { + writer_dynamic_batch_size_enabled: true, + writer_batch_size: target, + writer_dynamic_batch_size_min: 4 * 1024, + writer_buffer_memory_size: 1024 * 1024, + ..Config::default() + }; + let accumulator = RecordAccumulator::new(config, disabled_idempotence()); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + let cluster = Arc::new(build_cluster(&table_path, 1, 1)); + let row = GenericRow { + values: vec![Datum::Int32(1)], + }; + let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); + + accumulator.append(&record, 0, &cluster, false).unwrap(); + assert_eq!(*accumulator.memory_limiter.state.lock(), target as usize); + + let server = cluster.get_tablet_server(1).expect("server"); + let nodes = HashSet::from([server.clone()]); + let mut drained = accumulator + .drain(cluster.clone(), &nodes, 1024 * 1024) + .unwrap(); + let mut batches = drained.remove(&1).expect("drained batches"); + let batch = batches.pop().expect("batch"); + accumulator.remove_incomplete_batches(batch.write_batch.batch_id()); + assert_eq!(*accumulator.memory_limiter.state.lock(), 0); + + accumulator.append(&record, 0, &cluster, false).unwrap(); + let second = *accumulator.memory_limiter.state.lock(); + assert!(second < target as usize, "{second} >= {target}"); + } + + #[test] + fn dynamic_batch_size_grows_after_full_drained_batch() { + let max = 256 * 1024; + let config = Config { + writer_dynamic_batch_size_enabled: true, + writer_batch_size: max, + writer_dynamic_batch_size_min: 4 * 1024, + writer_buffer_memory_size: 4 * 1024 * 1024, + ..Config::default() + }; + let accumulator = RecordAccumulator::new(config, disabled_idempotence()); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + let cluster = Arc::new(build_cluster(&table_path, 1, 1)); + let nodes = HashSet::from([cluster.get_tablet_server(1).unwrap().clone()]); + + let kv = |size: usize| { + WriteRecord::for_upsert( + Arc::clone(&table_info), + Arc::clone(&physical_table_path), + 1, + Bytes::from(vec![0u8; 32]), + None, + WriteFormat::CompactedKv, + None, + Some(RowBytes::Owned(Bytes::from(vec![0u8; size]))), + ) + }; + let drain_one = || { + let mut d = accumulator.drain(cluster.clone(), &nodes, max).unwrap(); + let b = d.remove(&1).unwrap().pop().unwrap(); + accumulator.remove_incomplete_batches(b.write_batch.batch_id()); + }; + let target = || { + accumulator + .estimated_batch_size(&physical_table_path) + .unwrap() + }; + + accumulator.append(&kv(1), 0, &cluster, false).unwrap(); + drain_one(); + let after_shrink = target(); + assert!( + after_shrink < max as usize, + "shrink failed: after_shrink={after_shrink} max={max}" + ); + + // 0.9 sits safely above GROW_THRESHOLD (0.8) to avoid f64 boundary noise. + accumulator + .append(&kv(after_shrink * 9 / 10), 0, &cluster, false) + .unwrap(); + drain_one(); + let after_grow = target(); + assert!( + after_grow > after_shrink, + "grow failed: after_grow={after_grow} after_shrink={after_shrink}" + ); + } + + #[test] + fn dynamic_batch_size_disabled_keeps_static_target() { + let target = 256 * 1024; + let config = Config { + writer_dynamic_batch_size_enabled: false, + writer_batch_size: target, + writer_dynamic_batch_size_min: 4 * 1024, + writer_buffer_memory_size: 1024 * 1024, + ..Config::default() + }; + let accumulator = RecordAccumulator::new(config, disabled_idempotence()); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path.clone()))); + let cluster = Arc::new(build_cluster(&table_path, 1, 1)); + let row = GenericRow { + values: vec![Datum::Int32(1)], + }; + let record = WriteRecord::for_append(table_info, physical_table_path, 1, &row); + + let server = cluster.get_tablet_server(1).expect("server"); + let nodes = HashSet::from([server.clone()]); + for _ in 0..3 { + accumulator.append(&record, 0, &cluster, false).unwrap(); + assert_eq!(*accumulator.memory_limiter.state.lock(), target as usize); + + let mut drained = accumulator + .drain(cluster.clone(), &nodes, 1024 * 1024) + .unwrap(); + let mut batches = drained.remove(&1).expect("drained batches"); + let batch = batches.pop().expect("batch"); + accumulator.remove_incomplete_batches(batch.write_batch.batch_id()); + } + } } diff --git a/crates/fluss/src/client/write/dynamic_batch_size.rs b/crates/fluss/src/client/write/dynamic_batch_size.rs new file mode 100644 index 00000000..408263ee --- /dev/null +++ b/crates/fluss/src/client/write/dynamic_batch_size.rs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-table batch size estimator. Mirrors Java's `DynamicWriteBatchSizeEstimator`: +//! grow 10% above 80% fill, shrink 5% below 50%, clamped to `[min, max]`. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +const GROW_THRESHOLD: f64 = 0.8; +const SHRINK_THRESHOLD: f64 = 0.5; +const GROW_FACTOR: f64 = 1.1; +const SHRINK_FACTOR: f64 = 0.95; + +#[derive(Debug)] +pub(crate) struct DynamicWriteBatchSizeEstimator { + current: AtomicUsize, + min: usize, + max: usize, +} + +impl DynamicWriteBatchSizeEstimator { + pub fn new(min_size: usize, max_size: usize) -> Self { + Self { + current: AtomicUsize::new(max_size), + min: min_size.min(max_size), + max: max_size, + } + } + + pub fn current(&self) -> usize { + self.current.load(Ordering::Relaxed) + } + + /// Last-write-wins on races, matching Java's `ConcurrentHashMap.put`. + pub fn update(&self, actual: usize) -> usize { + let prev = self.current.load(Ordering::Relaxed); + let cur = prev as f64; + let actual = actual as f64; + let next = if actual > cur * GROW_THRESHOLD { + cur * GROW_FACTOR + } else if actual < cur * SHRINK_THRESHOLD { + cur * SHRINK_FACTOR + } else { + cur + }; + let clamped = (next as usize).clamp(self.min, self.max); + if clamped != prev { + self.current.store(clamped, Ordering::Relaxed); + } + clamped + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const MIN: usize = 256 * 1024; + const MAX: usize = 2 * 1024 * 1024; + /// ~41 shrink steps, ~22 grow steps; 50 covers both with margin. + const CONVERGENCE_STEPS: usize = 50; + + #[test] + fn starts_at_max() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + assert_eq!(est.current(), MAX); + } + + #[test] + fn min_clamped_to_max_when_misconfigured() { + let est = DynamicWriteBatchSizeEstimator::new(MAX * 2, MAX); + assert_eq!(est.current(), MAX); + assert_eq!(est.update(0), MAX); + } + + #[test] + fn grows_when_above_grow_threshold() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + for _ in 0..CONVERGENCE_STEPS { + est.update(0); + } + assert_eq!(est.current(), MIN); + + // 0.9 sits safely past the 0.8 threshold and avoids f64 boundary noise. + let next = est.update((MIN as f64 * 0.9) as usize); + assert_eq!(next, ((MIN as f64) * GROW_FACTOR) as usize); + } + + #[test] + fn shrinks_when_below_shrink_threshold() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + // 0.4 sits safely below the strict 0.5 threshold. + let next = est.update((MAX as f64 * 0.4) as usize); + assert_eq!(next, ((MAX as f64) * SHRINK_FACTOR) as usize); + } + + #[test] + fn shrink_clamps_to_min() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + for _ in 0..CONVERGENCE_STEPS { + est.update(0); + } + assert_eq!(est.current(), MIN); + } + + #[test] + fn grow_clamps_to_max() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + for _ in 0..CONVERGENCE_STEPS { + est.update(0); + } + for _ in 0..CONVERGENCE_STEPS { + est.update(est.current()); + } + assert_eq!(est.current(), MAX); + } + + #[test] + fn oversized_actual_clamps_at_max() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + assert_eq!(est.update(MAX * 4), MAX); + } + + #[test] + fn dead_zone_is_a_fixed_point() { + let est = DynamicWriteBatchSizeEstimator::new(MIN, MAX); + let initial = est.current(); + for _ in 0..20 { + est.update((est.current() as f64 * 0.65) as usize); + } + assert_eq!(est.current(), initial); + } +} diff --git a/crates/fluss/src/client/write/mod.rs b/crates/fluss/src/client/write/mod.rs index bc324e18..a65b5d5a 100644 --- a/crates/fluss/src/client/write/mod.rs +++ b/crates/fluss/src/client/write/mod.rs @@ -17,6 +17,7 @@ mod accumulator; mod batch; +mod dynamic_batch_size; mod idempotence; use crate::client::broadcast::{self as client_broadcast, BatchWriteResult, BroadcastOnceReceiver}; diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 09a17f83..cad8d9cb 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -22,6 +22,9 @@ use strum_macros::{Display, EnumString}; const DEFAULT_BOOTSTRAP_SERVER: &str = "127.0.0.1:9123"; const DEFAULT_REQUEST_MAX_SIZE: i32 = 10 * 1024 * 1024; const DEFAULT_WRITER_BATCH_SIZE: i32 = 2 * 1024 * 1024; +// Mirrors Java's `2 * pageSize` floor with default pageSize = 128 KB. +const DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_MIN: i32 = 256 * 1024; +const DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED: bool = true; const DEFAULT_RETRIES: i32 = i32::MAX; const DEFAULT_PREFETCH_NUM: usize = 4; const DEFAULT_DOWNLOAD_THREADS: usize = 3; @@ -76,6 +79,17 @@ pub struct Config { #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)] pub writer_batch_size: i32, + /// Tune the per-table writer batch size from observed fill ratios. + /// Default: true (matching Java `client.writer.dynamic-batch-size.enabled`). + #[arg(long, default_value_t = DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED)] + pub writer_dynamic_batch_size_enabled: bool, + + /// Lower bound for the dynamic batch size estimator. + /// Default: 262144 (256 KB), matching Java's `2 * pageSize` floor. + /// Ignored when `writer_dynamic_batch_size_enabled` is false. + #[arg(long, default_value_t = DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_MIN)] + pub writer_dynamic_batch_size_min: i32, + #[arg(long, value_enum, default_value_t = NoKeyAssigner::Sticky)] pub writer_bucket_no_key_assigner: NoKeyAssigner, @@ -199,6 +213,14 @@ impl std::fmt::Debug for Config { .field("writer_acks", &self.writer_acks) .field("writer_retries", &self.writer_retries) .field("writer_batch_size", &self.writer_batch_size) + .field( + "writer_dynamic_batch_size_enabled", + &self.writer_dynamic_batch_size_enabled, + ) + .field( + "writer_dynamic_batch_size_min", + &self.writer_dynamic_batch_size_min, + ) .field( "writer_bucket_no_key_assigner", &self.writer_bucket_no_key_assigner, @@ -267,6 +289,8 @@ impl Default for Config { writer_acks: String::from(DEFAULT_ACKS), writer_retries: i32::MAX, writer_batch_size: DEFAULT_WRITER_BATCH_SIZE, + writer_dynamic_batch_size_enabled: DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_ENABLED, + writer_dynamic_batch_size_min: DEFAULT_WRITER_DYNAMIC_BATCH_SIZE_MIN, writer_bucket_no_key_assigner: NoKeyAssigner::Sticky, scanner_remote_log_prefetch_num: DEFAULT_PREFETCH_NUM, remote_file_download_thread_num: DEFAULT_DOWNLOAD_THREADS, @@ -388,6 +412,12 @@ impl Config { if self.writer_batch_size as usize > self.writer_buffer_memory_size { return Err("writer_batch_size must be <= writer_buffer_memory_size".to_string()); } + if self.writer_dynamic_batch_size_min <= 0 { + return Err("writer_dynamic_batch_size_min must be > 0".to_string()); + } + if self.writer_dynamic_batch_size_min > self.writer_batch_size { + return Err("writer_dynamic_batch_size_min must be <= writer_batch_size".to_string()); + } // idempotence checks if !self.writer_enable_idempotence { return Ok(()); diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index c50d40cd..d904a064 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -21,7 +21,9 @@ Complete API reference for the Fluss C++ client. | `writer_request_max_size` | `int32_t` | `10485760` (10 MB) | Maximum request size in bytes | | `writer_acks` | `std::string` | `"all"` | Acknowledgment setting (`"all"`, `"0"`, `"1"`, or `"-1"`) | | `writer_retries` | `int32_t` | `INT32_MAX` | Number of retries on failure | -| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes | +| `writer_batch_size` | `int32_t` | `2097152` (2 MB) | Batch size for writes in bytes. Upper bound when dynamic sizing is on; fixed batch size when off | +| `writer_dynamic_batch_size_enabled` | `bool` | `true` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50% | +| `writer_dynamic_batch_size_min` | `int32_t` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when disabled) | | `writer_batch_timeout_ms` | `int64_t` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `std::string` | `"sticky"` | Bucket assignment strategy for tables without bucket keys: `"sticky"` or `"round_robin"` | | `scanner_remote_log_prefetch_num` | `size_t` | `4` | Number of remote log segments to prefetch | diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 32f23a59..aec4412d 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -14,7 +14,9 @@ Complete API reference for the Fluss Python client. | `writer_request_max_size` | `writer.request-max-size` | Get/set max request size in bytes | | `writer_acks` | `writer.acks` | Get/set acknowledgment setting (`"all"` for all replicas) | | `writer_retries` | `writer.retries` | Get/set number of retries on failure | -| `writer_batch_size` | `writer.batch-size` | Get/set write batch size in bytes | +| `writer_batch_size` | `writer.batch-size` | Get/set write batch size in bytes. Upper bound when dynamic sizing is on; fixed batch size when off | +| `writer_dynamic_batch_size_enabled` | `writer.dynamic-batch-size.enabled` | Get/set whether the per-table dynamic batch size estimator is enabled (default `true`) | +| `writer_dynamic_batch_size_min` | `writer.dynamic-batch-size-min` | Get/set the lower bound for the dynamic batch size estimator (default 256 KB; ignored when disabled) | | `writer_batch_timeout_ms` | `writer.batch-timeout-ms` | Get/set max time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `writer.bucket.no-key-assigner` | Get/set bucket assignment strategy (`"sticky"` or `"round_robin"`) | | `scanner_remote_log_prefetch_num` | `scanner.remote-log.prefetch-num` | Get/set number of remote log segments to prefetch | diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 9f2994ad..03054f0f 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -13,7 +13,9 @@ Complete API reference for the Fluss Rust client. | `writer_request_max_size` | `i32` | `10485760` (10 MB) | Maximum request size in bytes | | `writer_acks` | `String` | `"all"` | Acknowledgment setting (`"all"` waits for all replicas) | | `writer_retries` | `i32` | `i32::MAX` | Number of retries on failure | -| `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch size for writes in bytes | +| `writer_batch_size` | `i32` | `2097152` (2 MB) | Batch size for writes in bytes. Upper bound when dynamic sizing is on; fixed batch size when off. | +| `writer_dynamic_batch_size_enabled` | `bool` | `true` | Enable per-table dynamic batch sizing: target grows 10% above 80% fill, shrinks 5% below 50%, clamped to `[writer_dynamic_batch_size_min, writer_batch_size]` | +| `writer_dynamic_batch_size_min` | `i32` | `262144` (256 KB) | Lower bound for the dynamic batch size estimator (ignored when `writer_dynamic_batch_size_enabled` is `false`) | | `writer_batch_timeout_ms` | `i64` | `100` | Maximum time in ms to wait for a writer batch to fill up before sending | | `writer_bucket_no_key_assigner` | `NoKeyAssigner` | `sticky` | Bucket assignment strategy for tables without bucket keys: `sticky` or `round_robin` | | `scanner_remote_log_prefetch_num` | `usize` | `4` | Number of remote log segments to prefetch |