Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,36 @@ public class ConfigOptions {
"The interval of auto partition check. "
+ "The default value is 10 minutes.");

public static final ConfigOption<Integer> AUTO_PARTITION_DROP_QUEUE_BACKPRESSURE_THRESHOLD =
key("auto-partition.drop.queue-backpressure-threshold")
.intType()
.defaultValue(1000)
.withDescription(
"When the coordinator event queue size reaches this threshold, "
+ "auto partition will skip the drop step for the current round and retry "
+ "in the next AUTO_PARTITION_CHECK_INTERVAL. This prevents overwhelming "
+ "the coordinator at midnight day rotation when one partition drop fans "
+ "out into thousands of bucket/replica deletion events. Set to a "
+ "non-positive value to disable backpressure (always drop).");

public static final ConfigOption<Integer> AUTO_PARTITION_DROP_MAX_BUCKETS_PER_ROUND =
key("auto-partition.drop.max-buckets-per-round")
.intType()
.defaultValue(1000)
.withDescription(
"Per-round drop budget across all auto-partition tables, measured "
+ "in number of bucket-deletion events. Because dropping one "
+ "partition fans out into numBuckets * replicationFactor "
+ "deletion events on the coordinator queue, accounting in "
+ "buckets gives uniform protection regardless of each table's "
+ "bucket count. To avoid starving tables whose single-partition "
+ "bucket count exceeds the budget, every table is allowed at "
+ "least one partition drop per round even if it temporarily "
+ "overshoots the budget. Remaining over-quota drops will be "
+ "processed in the next AUTO_PARTITION_CHECK_INTERVAL. Set to "
+ "a non-positive value to disable the cap (drop without "
+ "limit).");

public static final ConfigOption<Boolean> LOG_TABLE_ALLOW_CREATION =
key("allow.create.log.tables")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntSupplier;

import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition;
Expand All @@ -90,6 +91,24 @@ public class AutoPartitionManager implements AutoCloseable {
private final long periodicInterval;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

/**
* Probe of the coordinator event queue size, used to apply queue-aware backpressure on the drop
* path. A drop of one partition fans out into many bucket/replica deletion events, so when the
* queue is heavily backed up we skip drops for the current round and retry next round. Using
* {@link IntSupplier} keeps this manager decoupled from {@code CoordinatorEventManager}.
*/
private final IntSupplier coordinatorEventQueueSizeProvider;

/** Non-positive value disables the queue-aware backpressure gate. */
private final int dropQueueBackpressureThreshold;

/**
* Non-positive value disables the per-round drop cap (drop without limit). The unit is number
* of bucket-deletion events (i.e. partition count * numBuckets), since the coordinator queue
* pressure scales with bucket fan-out, not partition count.
*/
private final int dropMaxBucketsPerRound;

// TODO these two local cache can be removed if we introduce server cache.
@GuardedBy("lock")
private final Map<Long, TableInfo> autoPartitionTables = new HashMap<>();
Expand All @@ -109,13 +128,22 @@ public AutoPartitionManager(
ServerMetadataCache metadataCache,
MetadataManager metadataManager,
Configuration conf) {
this(metadataCache, metadataManager, conf, () -> 0);
}

public AutoPartitionManager(
ServerMetadataCache metadataCache,
MetadataManager metadataManager,
Configuration conf,
IntSupplier coordinatorEventQueueSizeProvider) {
this(
metadataCache,
metadataManager,
conf,
SystemClock.getInstance(),
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("periodic-auto-partition-manager")));
1, new ExecutorThreadFactory("periodic-auto-partition-manager")),
coordinatorEventQueueSizeProvider);
}

@VisibleForTesting
Expand All @@ -125,11 +153,27 @@ public AutoPartitionManager(
Configuration conf,
Clock clock,
ScheduledExecutorService periodicExecutor) {
this(metadataCache, metadataManager, conf, clock, periodicExecutor, () -> 0);
}

@VisibleForTesting
AutoPartitionManager(
ServerMetadataCache metadataCache,
MetadataManager metadataManager,
Configuration conf,
Clock clock,
ScheduledExecutorService periodicExecutor,
IntSupplier coordinatorEventQueueSizeProvider) {
this.metadataCache = metadataCache;
this.metadataManager = metadataManager;
this.clock = clock;
this.periodicExecutor = periodicExecutor;
this.periodicInterval = conf.get(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL).toMillis();
this.coordinatorEventQueueSizeProvider = checkNotNull(coordinatorEventQueueSizeProvider);
this.dropQueueBackpressureThreshold =
conf.get(ConfigOptions.AUTO_PARTITION_DROP_QUEUE_BACKPRESSURE_THRESHOLD);
this.dropMaxBucketsPerRound =
conf.get(ConfigOptions.AUTO_PARTITION_DROP_MAX_BUCKETS_PER_ROUND);
}

public void initAutoPartitionTables(List<TableInfo> tableInfos) {
Expand Down Expand Up @@ -297,6 +341,12 @@ private void doAutoPartition(long tableId, boolean forceDoAutoPartition) {

private void doAutoPartition(Instant now, Set<Long> tableIds, boolean forceDoAutoPartition) {
LOG.info("Start auto partitioning for {} tables at {}.", tableIds.size(), now);

// Per-round drop budget across all tables, in unit of bucket-deletion events.
// Non-positive cap disables the budget (drop without limit).
boolean unlimitedBudget = dropMaxBucketsPerRound <= 0;
int dropBudgetThisRound = unlimitedBudget ? Integer.MAX_VALUE : dropMaxBucketsPerRound;

for (Long tableId : tableIds) {
Instant createPartitionInstant = now;
if (!forceDoAutoPartition) {
Expand Down Expand Up @@ -335,12 +385,46 @@ private void doAutoPartition(Instant now, Set<Long> tableIds, boolean forceDoAut
continue;
}

dropPartitions(
tablePath,
tableInfo.getPartitionKeys(),
createPartitionInstant,
tableInfo.getTableConfig().getAutoPartitionStrategy(),
currentPartitions);
// Queue-aware backpressure gate: skip this table's drop for the current round
// when the coordinator event queue is heavily backed up. The skipped partitions
// remain in partitionsByTable and will be retried in the next check interval.
boolean skipDrop = false;
if (dropQueueBackpressureThreshold > 0) {
int qSize = coordinatorEventQueueSizeProvider.getAsInt();
if (qSize >= dropQueueBackpressureThreshold) {
LOG.info(
"Skipping auto-partition drop for table [{}] (id={}) this round "
+ "because coordinator event queue size {} >= threshold {}.",
tablePath,
tableId,
qSize,
dropQueueBackpressureThreshold);
skipDrop = true;
}
}

if (!skipDrop && (unlimitedBudget || dropBudgetThisRound > 0)) {
// Use the actual current time for drop to ensure timely cleanup.
// The random delay is only meant for spreading partition creation load,
// not for delaying retention cleanup.
int budgetForCall = unlimitedBudget ? Integer.MAX_VALUE : dropBudgetThisRound;
int bucketsConsumed =
dropPartitions(
tablePath,
tableInfo.getPartitionKeys(),
now,
tableInfo.getTableConfig().getAutoPartitionStrategy(),
currentPartitions,
budgetForCall,
tableInfo.getNumBuckets());
if (!unlimitedBudget) {
// bucketsConsumed may exceed budgetForCall by at most
// (numBucketsPerPartition - 1) due to the per-table starvation guard,
// which legitimately drives the shared budget non-positive and stops
// subsequent tables from dropping in this round.
dropBudgetThisRound -= bucketsConsumed;
}
}
createPartitions(tableInfo, createPartitionInstant, currentPartitions);
}
}
Expand Down Expand Up @@ -432,16 +516,34 @@ private List<ResolvedPartitionSpec> partitionNamesToPreCreate(
return partitionsToCreate;
}

private void dropPartitions(
/**
* Drops expired partitions, bounded by {@code bucketBudget} (in unit of bucket-deletion
* events). Each successful partition drop consumes {@code numBucketsPerPartition} from the
* budget. Returns the total bucket budget consumed by this call so the caller can decrement the
* per-round shared budget accordingly.
*
* <p>Starvation guard: when a single partition's bucket count exceeds {@code bucketBudget}, the
* first partition drop of this call is still allowed to proceed (overshooting the budget).
* Without this guard, tables with very large {@code numBuckets} could be starved indefinitely
* whenever the global shared budget is small. The overshoot is naturally absorbed by {@code
* dropBudgetThisRound} going non-positive in the caller, which suppresses subsequent tables for
* the rest of the round.
*
* <p>When the queue-aware backpressure threshold is crossed mid-iteration, this method bails
* out early to keep the coordinator responsive.
*/
private int dropPartitions(
TablePath tablePath,
List<String> partitionKeys,
Instant currentInstant,
AutoPartitionStrategy autoPartitionStrategy,
NavigableMap<String, Set<String>> currentPartitions) {
NavigableMap<String, Set<String>> currentPartitions,
int bucketBudget,
int numBucketsPerPartition) {
int numToRetain = autoPartitionStrategy.numToRetain();
// negative value means not to drop partitions
if (numToRetain < 0) {
return;
if (numToRetain < 0 || bucketBudget <= 0) {
return 0;
}

ZonedDateTime currentZonedDateTime =
Expand All @@ -467,6 +569,9 @@ private void dropPartitions(
// (a=?,dt=20250506,b=?) (a=?,dt=20250507,b=?) will be retained.
Iterator<Map.Entry<String, Set<String>>> iterator =
currentPartitions.headMap(lastRetainPartitionTime).entrySet().iterator();
int dropped = 0;
int bucketsConsumed = 0;
outer:
while (iterator.hasNext()) {
Map.Entry<String, Set<String>> entry = iterator.next();

Expand All @@ -478,6 +583,14 @@ private void dropPartitions(
}

while (dropIterator.hasNext()) {
// Budget check with starvation guard: always allow the first partition drop
// for this table even if it would exceed the budget; otherwise tables whose
// single-partition bucket count is larger than the shared budget would never
// get a chance to drop.
if (dropped > 0 && (long) bucketsConsumed + numBucketsPerPartition > bucketBudget) {
// Budget exhausted; leave remaining partitions for next round.
break outer;
}
String partitionName = dropIterator.next();
// drop the partition
try {
Expand All @@ -494,13 +607,33 @@ private void dropPartitions(

// only remove when zk success, this reflects to the partitionsByTable
dropIterator.remove();
dropped++;
bucketsConsumed += numBucketsPerPartition;
LOG.info(
"Auto partitioning deleted partition {} for table [{}].",
partitionName,
tablePath);

// Dynamic backpressure: each partition drop fans out into many
// bucket/replica deletion events. Re-check the queue after every drop and
// bail out early if it has crossed the threshold.
if (dropQueueBackpressureThreshold > 0
&& coordinatorEventQueueSizeProvider.getAsInt()
>= dropQueueBackpressureThreshold) {
LOG.info(
"Stopping auto-partition drop for table [{}] mid-round because "
+ "coordinator event queue size has reached threshold {}.",
tablePath,
dropQueueBackpressureThreshold);
break outer;
}
}
// Only remove the time-key entry when its partition set has been fully drained.
if (entry.getValue() == null || entry.getValue().isEmpty()) {
iterator.remove();
}
iterator.remove();
}
return bucketsConsumed;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,18 @@ protected void initCoordinatorLeader() throws Exception {
this.coordinatorChannelManager = new CoordinatorChannelManager(rpcClient);

this.autoPartitionManager =
new AutoPartitionManager(metadataCache, metadataManager, conf);
new AutoPartitionManager(
metadataCache,
metadataManager,
conf,
// Late-bound queue size probe: coordinatorEventProcessor is built
// after autoPartitionManager, so we read the field on each call.
// Returns 0 before the processor is wired up, which simply disables
// the queue-aware backpressure gate during that brief window.
() -> {
CoordinatorEventProcessor p = this.coordinatorEventProcessor;
return p == null ? 0 : p.getCoordinatorEventManager().queueSize();
});
autoPartitionManager.start();

// start coordinator event processor after we register coordinator leader to zk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ public void clearAndPut(CoordinatorEvent event) {
});
}

/**
* Returns the current number of pending events in the coordinator event queue. Used by external
* probes (e.g. {@code AutoPartitionManager}) to apply queue-aware backpressure.
*/
public int queueSize() {
return queue.size();
}

private class CoordinatorEventThread extends ShutdownableThread {

private long lastMetricsUpdateTime = 0;
Expand Down
Loading