Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1205,9 +1205,12 @@ void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_in
engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
request.__set_partitions_version(std::move(partitions_version));

auto* metrics = DorisMetrics::instance();
int64_t max_compaction_score =
std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
DorisMetrics::instance()->tablet_base_max_compaction_score->value());
std::max(metrics->tablet_cumulative_max_compaction_score->value(),
metrics->tablet_base_max_compaction_score->value());
max_compaction_score = std::max(max_compaction_score,
metrics->tablet_time_series_max_compaction_score->value());
request.__set_tablet_max_compaction_score(max_compaction_score);
request.__set_report_version(report_version);

Expand Down
26 changes: 18 additions & 8 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,12 @@ bool CloudStorageEngine::register_index_change_compaction(

std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_tasks(
CompactionType compaction_type, bool check_score) {
DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::CUMULATIVE_COMPACTION);
std::vector<std::shared_ptr<CloudTablet>> tablets_compaction;

int64_t max_compaction_score = 0;
CompactionScoreStats score_stats;
bool got_score_stats = false;
std::unordered_set<int64_t> tablet_preparing_cumu_compaction;
std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
submitted_cumu_compactions;
Expand Down Expand Up @@ -724,22 +727,29 @@ std::vector<CloudTabletSPtr> CloudStorageEngine::_generate_cloud_compaction_task
do {
std::vector<CloudTabletSPtr> tablets;
auto st = tablet_mgr().get_topn_tablets_to_compact(n, compaction_type, filter_out, &tablets,
&max_compaction_score);
&score_stats);
if (!st.ok()) {
LOG(WARNING) << "failed to get tablets to compact, err=" << st;
break;
}
got_score_stats = true;
if (!need_pick_tablet) break;
tablets_compaction = std::move(tablets);
} while (false);

if (max_compaction_score > 0) {
if (compaction_type == CompactionType::BASE_COMPACTION) {
if (got_score_stats && score_stats.scanned) {
if (compaction_type == CompactionType::BASE_COMPACTION && score_stats.max_score > 0) {
DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
max_compaction_score);
} else {
DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
max_compaction_score);
score_stats.max_score);
} else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
if (check_score || score_stats.size_based_max_score > 0) {
DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
score_stats.size_based_max_score);
}
if (check_score || score_stats.time_series_max_score > 0) {
DorisMetrics::instance()->tablet_time_series_max_compaction_score->set_value(
score_stats.time_series_max_score);
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ class CloudStorageEngine final : public BaseStorageEngine {
void set_startup_timepoint(const std::chrono::time_point<std::chrono::system_clock>& tp) {
_startup_timepoint = tp;
}

std::vector<CloudTabletSPtr> generate_cloud_compaction_tasks_for_test(
CompactionType compaction_type, bool check_score) {
return _generate_cloud_compaction_tasks(compaction_type, check_score);
}
#endif

private:
Expand Down
21 changes: 16 additions & 5 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/status.h"
#include "cpp/sync_point.h"
#include "runtime/memory/cache_policy.h"
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
#include "util/debug_points.h"
#include "util/lru_cache.h"
#include "util/stack_util.h"
Expand Down Expand Up @@ -426,10 +427,11 @@ void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) {

Status CloudTabletMgr::get_topn_tablets_to_compact(
int n, CompactionType compaction_type, const std::function<bool(CloudTablet*)>& filter_out,
std::vector<std::shared_ptr<CloudTablet>>* tablets, int64_t* max_score) {
std::vector<std::shared_ptr<CloudTablet>>* tablets, CompactionScoreStats* score_stats) {
DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::CUMULATIVE_COMPACTION);
*max_score = 0;
*score_stats = {};
score_stats->scanned = true;
int64_t max_score_tablet_id = 0;
// clang-format off
auto score = [compaction_type](CloudTablet* t) {
Expand Down Expand Up @@ -489,9 +491,18 @@ Status CloudTabletMgr::get_topn_tablets_to_compact(

int64_t s = score(t.get());
if (s <= 0) { continue; }
if (s > *max_score) {
if (s > score_stats->max_score) {
max_score_tablet_id = t->tablet_id();
*max_score = s;
score_stats->max_score = s;
}
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
int64_t* policy_max_score =
t->tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY
? &score_stats->time_series_max_score
: &score_stats->size_based_max_score;
if (s > *policy_max_score) {
*policy_max_score = s;
}
}

if (filter_out(t.get())) { ++num_filtered; continue; }
Expand All @@ -506,7 +517,7 @@ Status CloudTabletMgr::get_topn_tablets_to_compact(
LOG_EVERY_N(INFO, 1000) << "get_topn_compaction_score, n=" << n << " type=" << compaction_type
<< " num_tablets=" << weak_tablets.size() << " num_skipped=" << num_skipped
<< " num_disabled=" << num_disabled << " num_filtered=" << num_filtered
<< " max_score=" << *max_score << " max_score_tablet=" << max_score_tablet_id
<< " max_score=" << score_stats->max_score << " max_score_tablet=" << max_score_tablet_id
<< " tablets=[" << [&buf] { std::stringstream ss; for (auto& i : buf) ss << i.first->tablet_id() << ":" << i.second << ","; return ss.str(); }() << "]"
;
// clang-format on
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ class CloudTabletMgr {
* @param filter_out a filter takes a tablet and return bool to check
* whether skipping the tablet, true for skip
* @param tablets output param
* @param max_score output param, max score of existed tablets
* @param score_stats output param, max scores of existed tablets
* @return status of this call
*/
Status get_topn_tablets_to_compact(int n, CompactionType compaction_type,
const std::function<bool(CloudTablet*)>& filter_out,
std::vector<std::shared_ptr<CloudTablet>>* tablets,
int64_t* max_score);
CompactionScoreStats* score_stats);

/**
* Gets tablets info and total tablet num that are reported
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/metrics/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(process_fd_num_limit_soft, MetricUnit::NOUNIT
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(process_fd_num_limit_hard, MetricUnit::NOUNIT);

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_cumulative_max_compaction_score, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_time_series_max_compaction_score, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_base_max_compaction_score, MetricUnit::NOUNIT);

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_rowsets_num, MetricUnit::NOUNIT);
Expand Down Expand Up @@ -376,6 +377,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_hard);

INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_cumulative_max_compaction_score);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_time_series_max_compaction_score);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_base_max_compaction_score);

INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_rowsets_num);
Expand Down
5 changes: 3 additions & 2 deletions be/src/common/metrics/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ class DorisMetrics {
IntGauge* process_fd_num_limit_hard = nullptr;

// the max compaction score of all tablets.
// Record base and cumulative scores separately, because
// we need to get the larger of the two.
// Record base, size-based cumulative and time-series cumulative scores separately,
// because we need to get the larger of them.
IntGauge* tablet_cumulative_max_compaction_score = nullptr;
IntGauge* tablet_time_series_max_compaction_score = nullptr;
IntGauge* tablet_base_max_compaction_score = nullptr;

IntGauge* all_rowsets_num = nullptr;
Expand Down
7 changes: 7 additions & 0 deletions be/src/storage/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ class DataDir {
(double)_disk_capacity_bytes;
}

#ifdef BE_TEST
void set_capacity_for_test(size_t disk_capacity_bytes, size_t available_bytes) {
_disk_capacity_bytes = disk_capacity_bytes;
_available_bytes = available_bytes;
}
#endif

// Move tablet to trash.
Status move_to_trash(const std::string& tablet_path);

Expand Down
7 changes: 7 additions & 0 deletions be/src/storage/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ using TabletUid = UniqueId;

enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2, FULL_COMPACTION = 3 };

struct CompactionScoreStats {
int64_t max_score = 0;
int64_t size_based_max_score = 0;
int64_t time_series_max_score = 0;
bool scanned = false;
};

enum DataDirType {
SPILL_DISK_DIR,
OLAP_DATA_DIR,
Expand Down
78 changes: 51 additions & 27 deletions be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,12 @@ bool CompactionSubmitRegistry::has_compaction_task(DataDir* dir, CompactionType

std::vector<TabletSharedPtr> CompactionSubmitRegistry::pick_topn_tablets_for_compaction(
TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score) {
const CumuCompactionPolicyTable& cumu_compaction_policies,
CompactionScoreStats* disk_score_stats) {
// non-lock, used in snapshot
return tablet_mgr->find_best_tablets_to_compaction(compaction_type, data_dir,
_get_tablet_set(data_dir, compaction_type),
disk_max_score, cumu_compaction_policies);
disk_score_stats, cumu_compaction_policies);
}

bool CompactionSubmitRegistry::insert(TabletSharedPtr tablet, CompactionType compaction_type) {
Expand Down Expand Up @@ -937,7 +938,7 @@ bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk,
return true;
}

int get_concurrent_per_disk(int max_score, int thread_per_disk) {
int get_concurrent_per_disk(int64_t max_score, int thread_per_disk) {
if (!config::enable_compaction_priority_scheduling) {
return thread_per_disk;
}
Expand Down Expand Up @@ -976,11 +977,14 @@ bool has_free_compaction_slot(CompactionSubmitRegistry* registry, DataDir* dir,

std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
compaction_type == CompactionType::CUMULATIVE_COMPACTION);
TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty",
std::vector<TabletSharedPtr> {});
_update_cumulative_compaction_policy();
std::vector<TabletSharedPtr> tablets_compaction;
uint32_t max_compaction_score = 0;
CompactionScoreStats max_score_stats;
bool skipped_capacity_limited_dir = false;

std::random_device rd;
std::mt19937 g(rd());
Expand All @@ -1001,35 +1005,55 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(

// Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(),
// So that we can update the max_compaction_score metric.
if (!data_dir->reach_capacity_limit(0)) {
uint32_t disk_max_score = 0;
auto tablets = compaction_registry_snapshot.pick_topn_tablets_for_compaction(
_tablet_manager.get(), data_dir, compaction_type,
_cumulative_compaction_policies, &disk_max_score);
int concurrent_num =
get_concurrent_per_disk(disk_max_score, disk_compaction_slot_num(*data_dir));
need_pick_tablet = need_generate_compaction_tasks(
executing_task_num, concurrent_num, compaction_type,
!compaction_registry_snapshot.has_compaction_task(
data_dir, CompactionType::CUMULATIVE_COMPACTION));
for (const auto& tablet : tablets) {
if (tablet != nullptr) {
if (need_pick_tablet) {
tablets_compaction.emplace_back(tablet);
}
max_compaction_score = std::max(max_compaction_score, disk_max_score);
if (data_dir->reach_capacity_limit(0)) {
skipped_capacity_limited_dir = true;
continue;
}

CompactionScoreStats disk_score_stats;
auto tablets = compaction_registry_snapshot.pick_topn_tablets_for_compaction(
_tablet_manager.get(), data_dir, compaction_type, _cumulative_compaction_policies,
&disk_score_stats);
max_score_stats.scanned = max_score_stats.scanned || disk_score_stats.scanned;
max_score_stats.max_score = std::max(max_score_stats.max_score, disk_score_stats.max_score);
max_score_stats.size_based_max_score = std::max(max_score_stats.size_based_max_score,
disk_score_stats.size_based_max_score);
max_score_stats.time_series_max_score = std::max(max_score_stats.time_series_max_score,
disk_score_stats.time_series_max_score);
int concurrent_num = get_concurrent_per_disk(disk_score_stats.max_score,
disk_compaction_slot_num(*data_dir));
need_pick_tablet = need_generate_compaction_tasks(
executing_task_num, concurrent_num, compaction_type,
!compaction_registry_snapshot.has_compaction_task(
data_dir, CompactionType::CUMULATIVE_COMPACTION));
for (const auto& tablet : tablets) {
if (tablet != nullptr) {
if (need_pick_tablet) {
tablets_compaction.emplace_back(tablet);
}
}
}
}

if (max_compaction_score > 0) {
if (compaction_type == CompactionType::BASE_COMPACTION) {
if (max_score_stats.scanned) {
if (compaction_type == CompactionType::BASE_COMPACTION && max_score_stats.max_score > 0) {
DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(
max_compaction_score);
} else {
DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(
max_compaction_score);
max_score_stats.max_score);
} else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
auto update_policy_score = [skipped_capacity_limited_dir, check_score](IntGauge* metric,
int64_t score) {
if (skipped_capacity_limited_dir) {
if (score > metric->value()) {
metric->set_value(score);
}
} else if (check_score || score > 0) {
metric->set_value(score);
}
};
update_policy_score(DorisMetrics::instance()->tablet_cumulative_max_compaction_score,
max_score_stats.size_based_max_score);
update_policy_score(DorisMetrics::instance()->tablet_time_series_max_compaction_score,
max_score_stats.time_series_max_score);
}
}
return tablets_compaction;
Expand Down
14 changes: 13 additions & 1 deletion be/src/storage/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ class CompactionSubmitRegistry {

std::vector<TabletSharedPtr> pick_topn_tablets_for_compaction(
TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score);
const CumuCompactionPolicyTable& cumu_compaction_policies,
CompactionScoreStats* disk_score_stats);

private:
TabletSet& _get_tablet_set(DataDir* dir, CompactionType compaction_type);
Expand Down Expand Up @@ -381,6 +382,17 @@ class StorageEngine final : public BaseStorageEngine {

int64_t get_compaction_num_per_round() const { return _compaction_num_per_round; }

#ifdef BE_TEST
std::vector<TabletSharedPtr> generate_compaction_tasks_for_test(
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
return _generate_compaction_tasks(compaction_type, data_dirs, check_score);
}

CompactionSubmitRegistry& compaction_submit_registry_for_test() {
return _compaction_submit_registry;
}
#endif

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down
Loading
Loading