Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,12 @@ struct Configuration {
std::string writer_acks{"all"};
// Max number of writer retries
int32_t writer_retries{std::numeric_limits<int32_t>::max()};
// Writer batch size in bytes (2 MB)
// Writer batch size in bytes (2 MB), also the upper bound when dynamic sizing is on
int32_t writer_batch_size{2 * 1024 * 1024};
// Tune the per-table writer batch size from observed fill ratios
bool writer_dynamic_batch_size_enabled{true};
// Lower bound (256 KB) for the dynamic batch size estimator
int32_t writer_dynamic_batch_size_min{256 * 1024};
// Bucket assigner for tables without bucket keys: "sticky" or "round_robin"
std::string writer_bucket_no_key_assigner{"sticky"};
// Number of remote log batches to prefetch during scanning
Expand Down
2 changes: 2 additions & 0 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
ffi_config.writer_acks = rust::String(config.writer_acks);
ffi_config.writer_retries = config.writer_retries;
ffi_config.writer_batch_size = config.writer_batch_size;
ffi_config.writer_dynamic_batch_size_enabled = config.writer_dynamic_batch_size_enabled;
ffi_config.writer_dynamic_batch_size_min = config.writer_dynamic_batch_size_min;
ffi_config.writer_bucket_no_key_assigner = rust::String(config.writer_bucket_no_key_assigner);
ffi_config.scanner_remote_log_prefetch_num = config.scanner_remote_log_prefetch_num;
ffi_config.remote_file_download_thread_num = config.remote_file_download_thread_num;
Expand Down
4 changes: 4 additions & 0 deletions bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ mod ffi {
writer_acks: String,
writer_retries: i32,
writer_batch_size: i32,
writer_dynamic_batch_size_enabled: bool,
writer_dynamic_batch_size_min: i32,
writer_bucket_no_key_assigner: String,
scanner_remote_log_prefetch_num: usize,
remote_file_download_thread_num: usize,
Expand Down Expand Up @@ -905,6 +907,8 @@ fn new_connection(config: &ffi::FfiConfig) -> ffi::FfiPtrResult {
writer_acks: config.writer_acks.to_string(),
writer_retries: config.writer_retries,
writer_batch_size: config.writer_batch_size,
writer_dynamic_batch_size_enabled: config.writer_dynamic_batch_size_enabled,
writer_dynamic_batch_size_min: config.writer_dynamic_batch_size_min,
writer_batch_timeout_ms: config.writer_batch_timeout_ms,
writer_bucket_no_key_assigner: assigner_type,
scanner_remote_log_prefetch_num: config.scanner_remote_log_prefetch_num,
Expand Down
17 changes: 15 additions & 2 deletions bindings/elixir/lib/fluss/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ defmodule Fluss.Config do
@enforce_keys [:bootstrap_servers]
defstruct bootstrap_servers: nil,
writer_batch_size: nil,
writer_batch_timeout_ms: nil
writer_batch_timeout_ms: nil,
writer_dynamic_batch_size_enabled: nil,
writer_dynamic_batch_size_min: nil

@type t :: %__MODULE__{
bootstrap_servers: String.t(),
writer_batch_size: non_neg_integer() | nil,
writer_batch_timeout_ms: non_neg_integer() | nil
writer_batch_timeout_ms: non_neg_integer() | nil,
writer_dynamic_batch_size_enabled: boolean() | nil,
writer_dynamic_batch_size_min: non_neg_integer() | nil
}

@spec new(String.t()) :: t()
Expand All @@ -62,6 +66,15 @@ defmodule Fluss.Config do
def set_writer_batch_timeout_ms(%__MODULE__{} = config, ms) when is_integer(ms),
do: %{config | writer_batch_timeout_ms: ms}

@spec set_writer_dynamic_batch_size_enabled(t(), boolean()) :: t()
def set_writer_dynamic_batch_size_enabled(%__MODULE__{} = config, enabled)
when is_boolean(enabled),
do: %{config | writer_dynamic_batch_size_enabled: enabled}

@spec set_writer_dynamic_batch_size_min(t(), non_neg_integer()) :: t()
def set_writer_dynamic_batch_size_min(%__MODULE__{} = config, size) when is_integer(size),
do: %{config | writer_dynamic_batch_size_min: size}

@spec get_bootstrap_servers(t()) :: String.t()
def get_bootstrap_servers(%__MODULE__{bootstrap_servers: servers}), do: servers
end
8 changes: 8 additions & 0 deletions bindings/elixir/native/fluss_nif/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub struct NifConfig {
pub bootstrap_servers: String,
pub writer_batch_size: Option<i32>,
pub writer_batch_timeout_ms: Option<i64>,
pub writer_dynamic_batch_size_enabled: Option<bool>,
pub writer_dynamic_batch_size_min: Option<i32>,
}

impl NifConfig {
Expand All @@ -39,6 +41,12 @@ impl NifConfig {
if let Some(ms) = self.writer_batch_timeout_ms {
config.writer_batch_timeout_ms = ms;
}
if let Some(enabled) = self.writer_dynamic_batch_size_enabled {
config.writer_dynamic_batch_size_enabled = enabled;
}
if let Some(size) = self.writer_dynamic_batch_size_min {
config.writer_dynamic_batch_size_min = size;
}
config
}
}
8 changes: 8 additions & 0 deletions bindings/python/fluss/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ class Config:
@writer_batch_size.setter
def writer_batch_size(self, size: int) -> None: ...
@property
def writer_dynamic_batch_size_enabled(self) -> bool: ...
@writer_dynamic_batch_size_enabled.setter
def writer_dynamic_batch_size_enabled(self, enabled: bool) -> None: ...
@property
def writer_dynamic_batch_size_min(self) -> int: ...
@writer_dynamic_batch_size_min.setter
def writer_dynamic_batch_size_min(self, size: int) -> None: ...
@property
def writer_bucket_no_key_assigner(self) -> str: ...
@writer_bucket_no_key_assigner.setter
def writer_bucket_no_key_assigner(self, value: str) -> None: ...
Expand Down
43 changes: 43 additions & 0 deletions bindings/python/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ impl Config {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"writer.dynamic-batch-size.enabled" => {
config.writer_dynamic_batch_size_enabled = match value.as_str() {
"true" => true,
"false" => false,
other => {
return Err(FlussError::new_err(format!(
"Invalid value '{other}' for '{key}', expected 'true' or 'false'"
)));
}
};
}
"writer.dynamic-batch-size-min" => {
config.writer_dynamic_batch_size_min =
value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"writer.batch-timeout-ms" => {
config.writer_batch_timeout_ms = value.parse::<i64>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
Expand Down Expand Up @@ -250,6 +269,30 @@ impl Config {
self.inner.writer_batch_size = size;
}

/// Get whether the per-table dynamic batch size estimator is enabled
#[getter]
fn writer_dynamic_batch_size_enabled(&self) -> bool {
self.inner.writer_dynamic_batch_size_enabled
}

/// Set whether the per-table dynamic batch size estimator is enabled
#[setter]
fn set_writer_dynamic_batch_size_enabled(&mut self, enabled: bool) {
self.inner.writer_dynamic_batch_size_enabled = enabled;
}

/// Get the lower bound used by the dynamic batch size estimator
#[getter]
fn writer_dynamic_batch_size_min(&self) -> i32 {
self.inner.writer_dynamic_batch_size_min
}

/// Set the lower bound used by the dynamic batch size estimator
#[setter]
fn set_writer_dynamic_batch_size_min(&mut self, size: i32) {
self.inner.writer_dynamic_batch_size_min = size;
}

/// Get the scanner remote log prefetch num
#[getter]
fn scanner_remote_log_prefetch_num(&self) -> usize {
Expand Down
Loading
Loading