Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ suite("test_time_series_compaction_polciy", "p0") {
return rowsetCount
}

// Manually POST cumulative compaction to every tablet. Fire-and-forget;
// per-tablet errors (e.g. CUMULATIVE_NO_SUITABLE_VERSION on a tablet with
// nothing to merge) are expected and surface via the polling assertion.
def trigger_cumulative_all = { tabletsList ->
for (def tablet in tabletsList) {
def be_host = backendId_to_backendIP["${tablet.BackendId}"]
def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"]
curl("POST", "http://${be_host}:${be_port}/api/compaction/run?tablet_id=${tablet.TabletId}&compact_type=cumulative")
}
}

// Poll get_rowset_count until count <= target or timeout. Returns the last
// observed count. Avoids the run_status race in trigger_and_wait_compaction
// for time_series mode (BE may have queued the task but not yet acquired
// the cumulative lock when first polled).
def wait_rowset_count_le = { tabletsList, target, timeoutSec ->
long deadline = System.currentTimeMillis() + timeoutSec * 1000
int last = -1
while (System.currentTimeMillis() < deadline) {
last = get_rowset_count.call(tabletsList)
if (last <= target) {
return last
}
Thread.sleep(1000)
}
return last
}

sql """ DROP TABLE IF EXISTS ${tableName}; """
sql """
CREATE TABLE ${tableName} (
Expand Down Expand Up @@ -133,21 +161,19 @@ suite("test_time_series_compaction_polciy", "p0") {
assert (rowsetCount == 34 * replicaNum)

// trigger cumulative compactions for all tablets in table
trigger_and_wait_compaction(tableName, "cumulative")

// after cumulative compaction, there is only 26 rowset.
// 5 consecutive empty versions are merged into one empty version
// 34 - 2*4 = 26
rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 26 * replicaNum)
trigger_cumulative_all.call(tablets)
rowsetCount = wait_rowset_count_le.call(tablets, 26 * replicaNum, 60)
assert (rowsetCount == 26 * replicaNum) : "expected ${26 * replicaNum} rowsets, got ${rowsetCount}"

// trigger cumulative compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "cumulative")

// after cumulative compaction, there is only 22 rowset.
// 26 - 4 = 22
rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 22 * replicaNum)
trigger_cumulative_all.call(tablets)
rowsetCount = wait_rowset_count_le.call(tablets, 22 * replicaNum, 60)
assert (rowsetCount == 22 * replicaNum) : "expected ${22 * replicaNum} rowsets, got ${rowsetCount}"

qt_sql_2 """ select count() from ${tableName}"""
if (isCloudMode()) {
Expand All @@ -156,11 +182,10 @@ suite("test_time_series_compaction_polciy", "p0") {
sql """ alter table ${tableName} set ("time_series_compaction_file_count_threshold"="10")"""
sql """sync"""
// trigger cumulative compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "cumulative")

// after cumulative compaction, there is only 11 rowset.
rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 11 * replicaNum)
trigger_cumulative_all.call(tablets)
rowsetCount = wait_rowset_count_le.call(tablets, 11 * replicaNum, 60)
assert (rowsetCount == 11 * replicaNum) : "expected ${11 * replicaNum} rowsets, got ${rowsetCount}"
qt_sql_3 """ select count() from ${tableName}"""

sql """ DROP TABLE IF EXISTS ${tableName}; """
Expand Down
Loading