diff --git a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy index 7f3aa2abf81770..b970926c93b079 100644 --- a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy +++ b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy @@ -36,6 +36,34 @@ suite("test_time_series_compaction_polciy", "p0") { return rowsetCount } + // Manually POST cumulative compaction to every tablet. Fire-and-forget; + // per-tablet errors (e.g. CUMULATIVE_NO_SUITABLE_VERSION on a tablet with + // nothing to merge) are expected and surface via the polling assertion. + def trigger_cumulative_all = { tabletsList -> + for (def tablet in tabletsList) { + def be_host = backendId_to_backendIP["${tablet.BackendId}"] + def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"] + curl("POST", "http://${be_host}:${be_port}/api/compaction/run?tablet_id=${tablet.TabletId}&compact_type=cumulative") + } + } + + // Poll get_rowset_count until count <= target or timeout. Returns the last + // observed count. Avoids the run_status race in trigger_and_wait_compaction + // for time_series mode (BE may have queued the task but not yet acquired + // the cumulative lock when first polled). + def wait_rowset_count_le = { tabletsList, target, timeoutSec -> + long deadline = System.currentTimeMillis() + timeoutSec * 1000 + int last = -1 + while (System.currentTimeMillis() < deadline) { + last = get_rowset_count.call(tabletsList) + if (last <= target) { + return last + } + Thread.sleep(1000) + } + return last + } + sql """ DROP TABLE IF EXISTS ${tableName}; """ sql """ CREATE TABLE ${tableName} ( @@ -133,21 +161,19 @@ suite("test_time_series_compaction_polciy", "p0") { assert (rowsetCount == 34 * replicaNum) // trigger cumulative compactions for all tablets in table - trigger_and_wait_compaction(tableName, "cumulative") - // after cumulative compaction, there is only 26 rowset. // 5 consecutive empty versions are merged into one empty version // 34 - 2*4 = 26 - rowsetCount = get_rowset_count.call(tablets); - assert (rowsetCount == 26 * replicaNum) + trigger_cumulative_all.call(tablets) + rowsetCount = wait_rowset_count_le.call(tablets, 26 * replicaNum, 60) + assert (rowsetCount == 26 * replicaNum) : "expected ${26 * replicaNum} rowsets, got ${rowsetCount}" // trigger cumulative compactions for all tablets in ${tableName} - trigger_and_wait_compaction(tableName, "cumulative") - // after cumulative compaction, there is only 22 rowset. // 26 - 4 = 22 - rowsetCount = get_rowset_count.call(tablets); - assert (rowsetCount == 22 * replicaNum) + trigger_cumulative_all.call(tablets) + rowsetCount = wait_rowset_count_le.call(tablets, 22 * replicaNum, 60) + assert (rowsetCount == 22 * replicaNum) : "expected ${22 * replicaNum} rowsets, got ${rowsetCount}" qt_sql_2 """ select count() from ${tableName}""" if (isCloudMode()) { @@ -156,11 +182,10 @@ suite("test_time_series_compaction_polciy", "p0") { sql """ alter table ${tableName} set ("time_series_compaction_file_count_threshold"="10")""" sql """sync""" // trigger cumulative compactions for all tablets in ${tableName} - trigger_and_wait_compaction(tableName, "cumulative") - // after cumulative compaction, there is only 11 rowset. - rowsetCount = get_rowset_count.call(tablets); - assert (rowsetCount == 11 * replicaNum) + trigger_cumulative_all.call(tablets) + rowsetCount = wait_rowset_count_le.call(tablets, 11 * replicaNum, 60) + assert (rowsetCount == 11 * replicaNum) : "expected ${11 * replicaNum} rowsets, got ${rowsetCount}" qt_sql_3 """ select count() from ${tableName}""" sql """ DROP TABLE IF EXISTS ${tableName}; """