diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 1818e16ef3..470fc3a91e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -200,6 +200,36 @@ public class ConfigOptions { "The interval of auto partition check. " + "The default value is 10 minutes."); + public static final ConfigOption AUTO_PARTITION_DROP_QUEUE_BACKPRESSURE_THRESHOLD = + key("auto-partition.drop.queue-backpressure-threshold") + .intType() + .defaultValue(1000) + .withDescription( + "When the coordinator event queue size reaches this threshold, " + + "auto partition will skip the drop step for the current round and retry " + + "in the next AUTO_PARTITION_CHECK_INTERVAL. This prevents overwhelming " + + "the coordinator at midnight day rotation when one partition drop fans " + + "out into thousands of bucket/replica deletion events. Set to a " + + "non-positive value to disable backpressure (always drop)."); + + public static final ConfigOption AUTO_PARTITION_DROP_MAX_BUCKETS_PER_ROUND = + key("auto-partition.drop.max-buckets-per-round") + .intType() + .defaultValue(1000) + .withDescription( + "Per-round drop budget across all auto-partition tables, measured " + + "in number of bucket-deletion events. Because dropping one " + + "partition fans out into numBuckets * replicationFactor " + + "deletion events on the coordinator queue, accounting in " + + "buckets gives uniform protection regardless of each table's " + + "bucket count. To avoid starving tables whose single-partition " + + "bucket count exceeds the budget, every table is allowed at " + + "least one partition drop per round even if it temporarily " + + "overshoots the budget. Remaining over-quota drops will be " + + "processed in the next AUTO_PARTITION_CHECK_INTERVAL. Set to " + + "a non-positive value to disable the cap (drop without " + + "limit)."); + public static final ConfigOption LOG_TABLE_ALLOW_CREATION = key("allow.create.log.tables") .booleanType() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java index 695c770d45..44ad31ea26 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java @@ -64,6 +64,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.IntSupplier; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; import static org.apache.fluss.utils.PartitionUtils.generateAutoPartition; @@ -90,6 +91,24 @@ public class AutoPartitionManager implements AutoCloseable { private final long periodicInterval; private final AtomicBoolean isClosed = new AtomicBoolean(false); + /** + * Probe of the coordinator event queue size, used to apply queue-aware backpressure on the drop + * path. A drop of one partition fans out into many bucket/replica deletion events, so when the + * queue is heavily backed up we skip drops for the current round and retry next round. Using + * {@link IntSupplier} keeps this manager decoupled from {@code CoordinatorEventManager}. + */ + private final IntSupplier coordinatorEventQueueSizeProvider; + + /** Non-positive value disables the queue-aware backpressure gate. */ + private final int dropQueueBackpressureThreshold; + + /** + * Non-positive value disables the per-round drop cap (drop without limit). The unit is number + * of bucket-deletion events (i.e. partition count * numBuckets), since the coordinator queue + * pressure scales with bucket fan-out, not partition count. + */ + private final int dropMaxBucketsPerRound; + // TODO these two local cache can be removed if we introduce server cache. @GuardedBy("lock") private final Map autoPartitionTables = new HashMap<>(); @@ -109,13 +128,22 @@ public AutoPartitionManager( ServerMetadataCache metadataCache, MetadataManager metadataManager, Configuration conf) { + this(metadataCache, metadataManager, conf, () -> 0); + } + + public AutoPartitionManager( + ServerMetadataCache metadataCache, + MetadataManager metadataManager, + Configuration conf, + IntSupplier coordinatorEventQueueSizeProvider) { this( metadataCache, metadataManager, conf, SystemClock.getInstance(), Executors.newScheduledThreadPool( - 1, new ExecutorThreadFactory("periodic-auto-partition-manager"))); + 1, new ExecutorThreadFactory("periodic-auto-partition-manager")), + coordinatorEventQueueSizeProvider); } @VisibleForTesting @@ -125,11 +153,27 @@ public AutoPartitionManager( Configuration conf, Clock clock, ScheduledExecutorService periodicExecutor) { + this(metadataCache, metadataManager, conf, clock, periodicExecutor, () -> 0); + } + + @VisibleForTesting + AutoPartitionManager( + ServerMetadataCache metadataCache, + MetadataManager metadataManager, + Configuration conf, + Clock clock, + ScheduledExecutorService periodicExecutor, + IntSupplier coordinatorEventQueueSizeProvider) { this.metadataCache = metadataCache; this.metadataManager = metadataManager; this.clock = clock; this.periodicExecutor = periodicExecutor; this.periodicInterval = conf.get(ConfigOptions.AUTO_PARTITION_CHECK_INTERVAL).toMillis(); + this.coordinatorEventQueueSizeProvider = checkNotNull(coordinatorEventQueueSizeProvider); + this.dropQueueBackpressureThreshold = + conf.get(ConfigOptions.AUTO_PARTITION_DROP_QUEUE_BACKPRESSURE_THRESHOLD); + this.dropMaxBucketsPerRound = + conf.get(ConfigOptions.AUTO_PARTITION_DROP_MAX_BUCKETS_PER_ROUND); } public void initAutoPartitionTables(List tableInfos) { @@ -297,6 +341,12 @@ private void doAutoPartition(long tableId, boolean forceDoAutoPartition) { private void doAutoPartition(Instant now, Set tableIds, boolean forceDoAutoPartition) { LOG.info("Start auto partitioning for {} tables at {}.", tableIds.size(), now); + + // Per-round drop budget across all tables, in unit of bucket-deletion events. + // Non-positive cap disables the budget (drop without limit). + boolean unlimitedBudget = dropMaxBucketsPerRound <= 0; + int dropBudgetThisRound = unlimitedBudget ? Integer.MAX_VALUE : dropMaxBucketsPerRound; + for (Long tableId : tableIds) { Instant createPartitionInstant = now; if (!forceDoAutoPartition) { @@ -335,12 +385,46 @@ private void doAutoPartition(Instant now, Set tableIds, boolean forceDoAut continue; } - dropPartitions( - tablePath, - tableInfo.getPartitionKeys(), - createPartitionInstant, - tableInfo.getTableConfig().getAutoPartitionStrategy(), - currentPartitions); + // Queue-aware backpressure gate: skip this table's drop for the current round + // when the coordinator event queue is heavily backed up. The skipped partitions + // remain in partitionsByTable and will be retried in the next check interval. + boolean skipDrop = false; + if (dropQueueBackpressureThreshold > 0) { + int qSize = coordinatorEventQueueSizeProvider.getAsInt(); + if (qSize >= dropQueueBackpressureThreshold) { + LOG.info( + "Skipping auto-partition drop for table [{}] (id={}) this round " + + "because coordinator event queue size {} >= threshold {}.", + tablePath, + tableId, + qSize, + dropQueueBackpressureThreshold); + skipDrop = true; + } + } + + if (!skipDrop && (unlimitedBudget || dropBudgetThisRound > 0)) { + // Use the actual current time for drop to ensure timely cleanup. + // The random delay is only meant for spreading partition creation load, + // not for delaying retention cleanup. + int budgetForCall = unlimitedBudget ? Integer.MAX_VALUE : dropBudgetThisRound; + int bucketsConsumed = + dropPartitions( + tablePath, + tableInfo.getPartitionKeys(), + now, + tableInfo.getTableConfig().getAutoPartitionStrategy(), + currentPartitions, + budgetForCall, + tableInfo.getNumBuckets()); + if (!unlimitedBudget) { + // bucketsConsumed may exceed budgetForCall by at most + // (numBucketsPerPartition - 1) due to the per-table starvation guard, + // which legitimately drives the shared budget non-positive and stops + // subsequent tables from dropping in this round. + dropBudgetThisRound -= bucketsConsumed; + } + } createPartitions(tableInfo, createPartitionInstant, currentPartitions); } } @@ -432,16 +516,34 @@ private List partitionNamesToPreCreate( return partitionsToCreate; } - private void dropPartitions( + /** + * Drops expired partitions, bounded by {@code bucketBudget} (in unit of bucket-deletion + * events). Each successful partition drop consumes {@code numBucketsPerPartition} from the + * budget. Returns the total bucket budget consumed by this call so the caller can decrement the + * per-round shared budget accordingly. + * + *

Starvation guard: when a single partition's bucket count exceeds {@code bucketBudget}, the + * first partition drop of this call is still allowed to proceed (overshooting the budget). + * Without this guard, tables with very large {@code numBuckets} could be starved indefinitely + * whenever the global shared budget is small. The overshoot is naturally absorbed by {@code + * dropBudgetThisRound} going non-positive in the caller, which suppresses subsequent tables for + * the rest of the round. + * + *

When the queue-aware backpressure threshold is crossed mid-iteration, this method bails + * out early to keep the coordinator responsive. + */ + private int dropPartitions( TablePath tablePath, List partitionKeys, Instant currentInstant, AutoPartitionStrategy autoPartitionStrategy, - NavigableMap> currentPartitions) { + NavigableMap> currentPartitions, + int bucketBudget, + int numBucketsPerPartition) { int numToRetain = autoPartitionStrategy.numToRetain(); // negative value means not to drop partitions - if (numToRetain < 0) { - return; + if (numToRetain < 0 || bucketBudget <= 0) { + return 0; } ZonedDateTime currentZonedDateTime = @@ -467,6 +569,9 @@ private void dropPartitions( // (a=?,dt=20250506,b=?) (a=?,dt=20250507,b=?) will be retained. Iterator>> iterator = currentPartitions.headMap(lastRetainPartitionTime).entrySet().iterator(); + int dropped = 0; + int bucketsConsumed = 0; + outer: while (iterator.hasNext()) { Map.Entry> entry = iterator.next(); @@ -478,6 +583,14 @@ private void dropPartitions( } while (dropIterator.hasNext()) { + // Budget check with starvation guard: always allow the first partition drop + // for this table even if it would exceed the budget; otherwise tables whose + // single-partition bucket count is larger than the shared budget would never + // get a chance to drop. + if (dropped > 0 && (long) bucketsConsumed + numBucketsPerPartition > bucketBudget) { + // Budget exhausted; leave remaining partitions for next round. + break outer; + } String partitionName = dropIterator.next(); // drop the partition try { @@ -494,13 +607,33 @@ private void dropPartitions( // only remove when zk success, this reflects to the partitionsByTable dropIterator.remove(); + dropped++; + bucketsConsumed += numBucketsPerPartition; LOG.info( "Auto partitioning deleted partition {} for table [{}].", partitionName, tablePath); + + // Dynamic backpressure: each partition drop fans out into many + // bucket/replica deletion events. Re-check the queue after every drop and + // bail out early if it has crossed the threshold. + if (dropQueueBackpressureThreshold > 0 + && coordinatorEventQueueSizeProvider.getAsInt() + >= dropQueueBackpressureThreshold) { + LOG.info( + "Stopping auto-partition drop for table [{}] mid-round because " + + "coordinator event queue size has reached threshold {}.", + tablePath, + dropQueueBackpressureThreshold); + break outer; + } + } + // Only remove the time-key entry when its partition set has been fully drained. + if (entry.getValue() == null || entry.getValue().isEmpty()) { + iterator.remove(); } - iterator.remove(); } + return bucketsConsumed; } @VisibleForTesting diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 10ba95c05d..3557d5c4f9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -303,7 +303,18 @@ protected void initCoordinatorLeader() throws Exception { this.coordinatorChannelManager = new CoordinatorChannelManager(rpcClient); this.autoPartitionManager = - new AutoPartitionManager(metadataCache, metadataManager, conf); + new AutoPartitionManager( + metadataCache, + metadataManager, + conf, + // Late-bound queue size probe: coordinatorEventProcessor is built + // after autoPartitionManager, so we read the field on each call. + // Returns 0 before the processor is wired up, which simply disables + // the queue-aware backpressure gate during that brief window. + () -> { + CoordinatorEventProcessor p = this.coordinatorEventProcessor; + return p == null ? 0 : p.getCoordinatorEventManager().queueSize(); + }); autoPartitionManager.start(); // start coordinator event processor after we register coordinator leader to zk diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java index 6f13df501a..58a12ca576 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java @@ -221,6 +221,14 @@ public void clearAndPut(CoordinatorEvent event) { }); } + /** + * Returns the current number of pending events in the coordinator event queue. Used by external + * probes (e.g. {@code AutoPartitionManager}) to apply queue-aware backpressure. + */ + public int queueSize() { + return queue.size(); + } + private class CoordinatorEventThread extends ShutdownableThread { private long lastMetricsUpdateTime = 0; diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java index 5114ee1156..880d91d566 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java @@ -50,13 +50,17 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.IntSupplier; import java.util.stream.Stream; import static org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName; @@ -510,6 +514,55 @@ void testAutoCreateDayPartitionShouldJitter() throws Exception { "20250419", "20250420", "20250421", "20250422", "20250423"); } + @Test + void testDayPartitionDropShouldNotBeDelayedByJitter() throws Exception { + ZonedDateTime startTime = + LocalDateTime.parse("2025-04-19T00:00:00").atZone(ZoneId.systemDefault()); + long startMs = startTime.toInstant().toEpochMilli(); + ManualClock clock = new ManualClock(startMs); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + AutoPartitionManager autoPartitionManager = + new AutoPartitionManager( + new TestingServerMetadataCache(3), + metadataManager, + new Configuration(), + clock, + periodicExecutor); + autoPartitionManager.start(); + + // Create a DAY-partitioned table: numRetention=2, numPreCreate=4 + TableInfo table = createPartitionedTable(2, 4, AutoPartitionTimeUnit.DAY); + TablePath tablePath = table.getTablePath(); + autoPartitionManager.addAutoPartitionTable(table, true); + periodicExecutor.triggerNonPeriodicScheduledTasks(); + + Map partitions = + zookeeperClient.getPartitionRegistrations(tablePath); + // Initial: 20250419, 20250420, 20250421, 20250422 + assertThat(partitions.keySet()) + .containsExactlyInAnyOrder("20250419", "20250420", "20250421", "20250422"); + + Integer delayMinutes = + autoPartitionManager.getAutoCreateDayDelayMinutes(table.getTableId()); + assertThat(delayMinutes).isNotNull(); + + // Advance exactly 3 days to 2025-04-22T00:00:00. + // From 'now' perspective: current day is 20250422, retain 2 => keep 20250420, 20250421. + // So 20250419 should be dropped. + // + // With the bug (drop used delayed time): if delay > 0, delayed time would still + // be on 20250421, retain 2 => keep 20250419, 20250420 => 20250419 NOT dropped. + clock.advanceTime(Duration.ofDays(3)); + periodicExecutor.triggerPeriodicScheduledTasks(); + + partitions = zookeeperClient.getPartitionRegistrations(tablePath); + // 20250419 must be dropped regardless of jitter delay + assertThat(partitions.keySet()).doesNotContain("20250419"); + // Retained partitions should still exist + assertThat(partitions.keySet()).contains("20250420", "20250421", "20250422"); + } + /** * Test if AutoPartionManager.createPartition adheres to maxBucketLimit while adding new * parition automatically, skip if it breaches limit. @@ -613,6 +666,333 @@ void testUpdateAutoPartitionNumRetention() throws Exception { "2024091003", "2024091004", "2024091005", "2024091006", "2024091007"); } + // --------------------------------------------------------------------------------------- + // Tests for queue-aware backpressure gate and per-round drop budget. + // The startTime 2099-01-01T10:00:00 is far in the future so manually-added partitions + // dated 2024-xx-xx will all be considered expired (older than now - numRetention hours). + // --------------------------------------------------------------------------------------- + + @Test + void testDropProceedsWhenQueueBelowThreshold() throws Exception { + ZonedDateTime startTime = + LocalDateTime.parse("2099-01-01T10:00:00").atZone(ZoneId.systemDefault()); + ManualClock clock = new ManualClock(startTime.toInstant().toEpochMilli()); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + // queue stays at 0 -> never crosses threshold + AutoPartitionManager manager = + createDropConfigManager(clock, periodicExecutor, 10, 100, () -> 0); + manager.start(); + + TableInfo table = createHourPartitionedTable(101L, 2, 1); + TablePath tablePath = table.getTablePath(); + manager.addAutoPartitionTable(table, true); + periodicExecutor.triggerNonPeriodicScheduledTasks(); + + // pre-create the current hour partition. + assertThat(zookeeperClient.getPartitionRegistrations(tablePath).keySet()) + .containsExactlyInAnyOrder("2099010110"); + + // 6 expired partitions (all dated 2024-09-01, lexicographically before 2099010108) + List expired = + Arrays.asList( + "2024090101", + "2024090102", + "2024090103", + "2024090104", + "2024090105", + "2024090106"); + manuallyAddPartitions(manager, table, expired); + + periodicExecutor.triggerPeriodicScheduledTasks(); + + // all 6 expired partitions are dropped; current partition is kept + assertThat(zookeeperClient.getPartitionRegistrations(tablePath).keySet()) + .containsExactlyInAnyOrder("2099010110"); + } + + @Test + void testDropSkippedWhenQueueAboveThreshold() throws Exception { + ZonedDateTime startTime = + LocalDateTime.parse("2099-01-01T10:00:00").atZone(ZoneId.systemDefault()); + ManualClock clock = new ManualClock(startTime.toInstant().toEpochMilli()); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + AtomicInteger qSize = new AtomicInteger(20); + // threshold=10 -> qSize=20 will trip the gate and skip drops + AutoPartitionManager manager = + createDropConfigManager(clock, periodicExecutor, 10, 100, qSize::get); + manager.start(); + + TableInfo table = createHourPartitionedTable(102L, 2, 2); + TablePath tablePath = table.getTablePath(); + manager.addAutoPartitionTable(table, true); + periodicExecutor.triggerNonPeriodicScheduledTasks(); + + // pre-create 2 partitions: current and next hour + assertThat(zookeeperClient.getPartitionRegistrations(tablePath).keySet()) + .containsExactlyInAnyOrder("2099010110", "2099010111"); + + List expired = + Arrays.asList( + "2024090101", + "2024090102", + "2024090103", + "2024090104", + "2024090105", + "2024090106"); + manuallyAddPartitions(manager, table, expired); + + // queue is heavily backed up: drop is skipped, but create still works. + // advance clock so a new partition is pre-created next round. + clock.advanceTime(Duration.ofHours(1)); + periodicExecutor.triggerPeriodicScheduledTasks(); + Set after = zookeeperClient.getPartitionRegistrations(tablePath).keySet(); + // expired partitions still present in zk (drop was skipped) + assertThat(after).contains("2024090101", "2024090106"); + // create still proceeded -> 2099010112 added + assertThat(after).contains("2099010112"); + + // queue drains; next round should now drop everything expired. + qSize.set(0); + periodicExecutor.triggerPeriodicScheduledTasks(); + assertThat(zookeeperClient.getPartitionRegistrations(tablePath).keySet()) + .doesNotContainAnyElementsOf(expired); + } + + @Test + void testDropRespectsAbsoluteCapWhenItDominates() throws Exception { + ZonedDateTime startTime = + LocalDateTime.parse("2099-01-01T10:00:00").atZone(ZoneId.systemDefault()); + ManualClock clock = new ManualClock(startTime.toInstant().toEpochMilli()); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + // numBuckets per partition = 4 (createHourPartitionedTable distributedBy(4)). + // budget = 12 buckets (= 3 partitions per round). + AutoPartitionManager manager = + createDropConfigManager(clock, periodicExecutor, 0, 12, () -> 0); + manager.start(); + + TableInfo table = createHourPartitionedTable(103L, 2, 1); + TablePath tablePath = table.getTablePath(); + manager.addAutoPartitionTable(table, true); + periodicExecutor.triggerNonPeriodicScheduledTasks(); + + List expired = + Arrays.asList("2024090101", "2024090102", "2024090103", "2024090104", "2024090105"); + manuallyAddPartitions(manager, table, expired); + + // round 1: budget = 12 buckets = 3 partitions -> drops 3, leaves 2 + periodicExecutor.triggerPeriodicScheduledTasks(); + Set remainingExpired1 = expiredStillIn(tablePath, expired); + assertThat(remainingExpired1).hasSize(2); + + // round 2: budget = 12 buckets -> drops the remaining 2 + periodicExecutor.triggerPeriodicScheduledTasks(); + assertThat(expiredStillIn(tablePath, expired)).isEmpty(); + } + + @Test + void testDropBudgetIsSharedAcrossTables() throws Exception { + ZonedDateTime startTime = + LocalDateTime.parse("2099-01-01T10:00:00").atZone(ZoneId.systemDefault()); + ManualClock clock = new ManualClock(startTime.toInstant().toEpochMilli()); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + // total expired across A+B = 10 * 4 buckets = 40; budget = 12 buckets (= 3 partitions). + AutoPartitionManager manager = + createDropConfigManager(clock, periodicExecutor, 0, 12, () -> 0); + manager.start(); + + TableInfo tableA = createHourPartitionedTable(105L, 2, 1); + TableInfo tableB = createHourPartitionedTable(106L, 2, 1); + manager.addAutoPartitionTable(tableA, true); + manager.addAutoPartitionTable(tableB, true); + periodicExecutor.triggerNonPeriodicScheduledTasks(); + + List expiredA = + Arrays.asList("2024090101", "2024090102", "2024090103", "2024090104", "2024090105"); + List expiredB = + Arrays.asList("2024090201", "2024090202", "2024090203", "2024090204", "2024090205"); + manuallyAddPartitions(manager, tableA, expiredA); + manuallyAddPartitions(manager, tableB, expiredB); + + // round 1: shared budget = 12 buckets = 3 partitions -> A+B combined drops = 3 + periodicExecutor.triggerPeriodicScheduledTasks(); + int aLeft = expiredStillIn(tableA.getTablePath(), expiredA).size(); + int bLeft = expiredStillIn(tableB.getTablePath(), expiredB).size(); + assertThat(aLeft + bLeft).isEqualTo(7); + } + + @Test + void testStarvationGuardForLargeTable() throws Exception { + // When a single partition's bucket count exceeds the per-round budget, the + // starvation guard ensures at least one partition is dropped per round so the + // table cannot be permanently blocked. + ZonedDateTime startTime = + LocalDateTime.parse("2099-01-01T10:00:00").atZone(ZoneId.systemDefault()); + ManualClock clock = new ManualClock(startTime.toInstant().toEpochMilli()); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + // numBuckets per partition = 4; budget = 2 buckets (< 1 partition's 4 buckets). + // Without the starvation guard the table would never drop. + AutoPartitionManager manager = + createDropConfigManager(clock, periodicExecutor, 0, 2, () -> 0); + manager.start(); + + TableInfo table = createHourPartitionedTable(108L, 2, 1); + TablePath tablePath = table.getTablePath(); + manager.addAutoPartitionTable(table, true); + periodicExecutor.triggerNonPeriodicScheduledTasks(); + + List expired = + Arrays.asList("2024090101", "2024090102", "2024090103", "2024090104", "2024090105"); + manuallyAddPartitions(manager, table, expired); + + // round 1: budget=2 buckets but numBucketsPerPartition=4 -> starvation guard drops 1 + periodicExecutor.triggerPeriodicScheduledTasks(); + assertThat(expiredStillIn(tablePath, expired)).hasSize(4); + + // round 2: same situation -> drops 1 more + periodicExecutor.triggerPeriodicScheduledTasks(); + assertThat(expiredStillIn(tablePath, expired)).hasSize(3); + } + + @Test + void testDropDisablesAllCapsWhenZero() throws Exception { + ZonedDateTime startTime = + LocalDateTime.parse("2099-01-01T10:00:00").atZone(ZoneId.systemDefault()); + ManualClock clock = new ManualClock(startTime.toInstant().toEpochMilli()); + ManuallyTriggeredScheduledExecutorService periodicExecutor = + new ManuallyTriggeredScheduledExecutorService(); + + // both caps disabled (non-positive) -> unlimited budget, no queue gate + AutoPartitionManager manager = + createDropConfigManager(clock, periodicExecutor, 0, 0, () -> 9999); + manager.start(); + + TableInfo table = createHourPartitionedTable(107L, 2, 1); + TablePath tablePath = table.getTablePath(); + manager.addAutoPartitionTable(table, true); + periodicExecutor.triggerNonPeriodicScheduledTasks(); + + List expired = + Arrays.asList( + "2024090101", + "2024090102", + "2024090103", + "2024090104", + "2024090105", + "2024090106", + "2024090107", + "2024090108", + "2024090109", + "2024090110", + "2024090111", + "2024090112"); + manuallyAddPartitions(manager, table, expired); + + // single round drops all 12 despite a high queue size, because all caps are disabled + periodicExecutor.triggerPeriodicScheduledTasks(); + assertThat(expiredStillIn(tablePath, expired)).isEmpty(); + } + + // -------- helpers for the tests above ---------------------------------------------------- + + private AutoPartitionManager createDropConfigManager( + ManualClock clock, + ManuallyTriggeredScheduledExecutorService periodicExecutor, + int queueThreshold, + int maxBucketsPerRound, + IntSupplier queueSizeProvider) { + Configuration config = new Configuration(); + config.set(ConfigOptions.AUTO_PARTITION_DROP_QUEUE_BACKPRESSURE_THRESHOLD, queueThreshold); + config.set(ConfigOptions.AUTO_PARTITION_DROP_MAX_BUCKETS_PER_ROUND, maxBucketsPerRound); + return new AutoPartitionManager( + new TestingServerMetadataCache(3), + metadataManager, + config, + clock, + periodicExecutor, + queueSizeProvider); + } + + private void manuallyAddPartitions( + AutoPartitionManager manager, TableInfo table, List partitionNames) + throws Exception { + int replicaFactor = table.getTableConfig().getReplicationFactor(); + Map bucketAssignments = + generateAssignment( + table.getNumBuckets(), + replicaFactor, + new TabletServerInfo[] { + new TabletServerInfo(0, "rack0"), + new TabletServerInfo(1, "rack1"), + new TabletServerInfo(2, "rack2") + }) + .getBucketAssignments(); + long tableId = table.getTableId(); + PartitionAssignment partitionAssignment = + new PartitionAssignment(tableId, bucketAssignments); + for (String name : partitionNames) { + metadataManager.createPartition( + table.getTablePath(), + tableId, + partitionAssignment, + fromPartitionName(table.getPartitionKeys(), name), + false); + manager.addPartition(tableId, name); + } + } + + private Set expiredStillIn(TablePath tablePath, List expected) + throws Exception { + Set present = + new HashSet<>(zookeeperClient.getPartitionRegistrations(tablePath).keySet()); + present.retainAll(expected); + return present; + } + + private TableInfo createHourPartitionedTable( + long tableId, int partitionRetentionNum, int partitionPreCreateNum) throws Exception { + TablePath tablePath = TablePath.of("db", "test_drop_" + tableId + "_" + UUID.randomUUID()); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("dt", DataTypes.STRING()) + .primaryKey("id", "dt") + .build()) + .comment("hour-partitioned table for drop budget tests") + .distributedBy(4) + .partitionedBy("dt") + .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3) + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property(ConfigOptions.TABLE_AUTO_PARTITION_KEY, "dt") + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.HOUR) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION, + partitionRetentionNum) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE, + partitionPreCreateNum) + .build(); + long now = System.currentTimeMillis(); + TableInfo info = TableInfo.of(tablePath, tableId, 1, descriptor, remoteDataDir, now, now); + TableRegistration registration = + TableRegistration.newTable(tableId, remoteDataDir, descriptor); + zookeeperClient.registerTable(tablePath, registration); + return info; + } + private static class TestParams { final AutoPartitionTimeUnit timeUnit; final boolean multiplePartitionKeys; diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 2f77fb5676..579ff8e226 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -43,6 +43,8 @@ during the Fluss cluster working. | plugin.classloader.parent-first-patterns.additional | List<String> | (None) | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to `classloader.parent-first-patterns.default`. | | plugin.classloader.parent-first-patterns.default | String | java.,
org.apache.fluss.,
javax.annotation.,
org.slf4j,
org.apache.log4j,
org.apache.logging,
org.apache.commons.logging,
ch.qos.logback | A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the plugin parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. | | auto-partition.check.interval | Duration | 10min | The interval of auto partition check. The default value is 10 minutes. | +| auto-partition.drop.queue-backpressure-threshold | Integer | 1000 | When the coordinator event queue size reaches this threshold, auto partition will skip the drop step for the current round and retry in the next `auto-partition.check.interval`. This prevents overwhelming the coordinator at midnight day rotation when one partition drop fans out into thousands of bucket/replica deletion events. Set to a non-positive value to disable backpressure (always drop). | +| auto-partition.drop.max-buckets-per-round | Integer | 1000 | Per-round drop budget across all auto-partition tables, measured in number of bucket-deletion events. Because dropping one partition fans out into `numBuckets * replicationFactor` deletion events on the coordinator queue, accounting in buckets gives uniform protection regardless of each table's bucket count. To avoid starving tables whose single-partition bucket count exceeds the budget, every table is allowed at least one partition drop per round even if it temporarily overshoots the budget. Remaining over-quota drops will be processed in the next `auto-partition.check.interval`. Set to a non-positive value to disable the cap (drop without limit). | | allow.create.log.tables | Boolean | true | Whether to allow creation of log tables. When set to false, attempts to create log tables (tables without primary key) will be rejected. The default value is true. | | allow.create.kv.tables | Boolean | true | Whether to allow creation of kv tables (primary key tables). When set to false, attempts to create kv tables (tables with primary key) will be rejected. The default value is true. | | max.partition.num | Integer | 1000 | Limits the maximum number of partitions that can be created for a partitioned table to avoid creating too many partitions. | diff --git a/website/docs/table-design/data-distribution/partitioning.md b/website/docs/table-design/data-distribution/partitioning.md index ffeef84f6e..3bb8fcc901 100644 --- a/website/docs/table-design/data-distribution/partitioning.md +++ b/website/docs/table-design/data-distribution/partitioning.md @@ -78,9 +78,22 @@ The time unit for the automatic partition table `auto-partition.time-unit` can t ### Fluss Cluster Configuration Below are the configuration items related to Fluss cluster and automatic partitioning. -| Option | Type | Default | Description | -|-------------------------------|------------------|------------|------------------------------------------------| -| auto-partition.check.interval | Duration | 10 minutes | The interval of auto partition check. The time interval for automatic partition checking is set to 10 minutes by default, meaning that it checks the table partition status every 10 minutes to see if it meets the automatic partitioning criteria. If it does not meet the criteria, partitions will be automatically created or deleted. | +| Option | Type | Default | Description | +|--------------------------------------------------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| auto-partition.check.interval | Duration | 10min | The interval of auto partition check. The cluster periodically checks every auto-partitioned table to see whether new partitions need to be pre-created or expired partitions need to be dropped. | +| auto-partition.drop.queue-backpressure-threshold | Integer | 1000 | Queue-aware backpressure gate. When the coordinator event queue size reaches this threshold, auto partition skips the drop step for the current round and retries in the next interval. Non-positive value disables the gate. | +| auto-partition.drop.max-buckets-per-round | Integer | 1000 | Per-round drop budget shared across all auto-partition tables, measured in bucket-deletion events. Each partition drop consumes `numBuckets` from the budget. Tables whose single-partition bucket count exceeds the remaining budget are still allowed at least one drop per round (starvation guard). Non-positive value disables the cap. | + +### Drop Throttling and Backpressure +Dropping an auto-partition fans out into `numBuckets * replicationFactor` bucket-deletion events on the coordinator event queue. At day-rotation boundaries, many tables may have partitions to drop simultaneously, which can flood the queue and starve normal coordinator work (leader election, metadata updates, etc.). + +Fluss applies three layers of protection at every check round, in this order: + +1. **Queue-aware backpressure** (`auto-partition.drop.queue-backpressure-threshold`) — if the coordinator event queue is already heavily backed up, the entire drop step is skipped this round and retried in the next `auto-partition.check.interval`. Pre-creation of new partitions is unaffected. +2. **Per-round bucket budget** (`auto-partition.drop.max-buckets-per-round`) — caps the total number of bucket-deletion events emitted per round across all tables. Accounting in buckets (rather than partitions) gives uniform protection regardless of each table's bucket count. +3. **Starvation guard** — a table whose single-partition bucket count exceeds the remaining budget is still allowed to drop at least one partition per round, so very large tables (high `numBuckets`) cannot be permanently blocked. The guard is automatically self-limiting: any overshoot drives the shared budget to zero, which suppresses subsequent tables for the rest of the round. + +Leftover partitions that cannot be processed within the current round simply remain expired and are picked up in the next round; correctness is unaffected. Operators with very large clusters or extreme bucket counts may tune both budgets upward, or set them to a non-positive value to disable throttling entirely. ## Dynamic Partitioning