Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tea/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ arrow::Status ReadValues(Source* src, Config* config, std::string_view section_p
Get(src, section_prefix, "metadata_access", "default_schema", &config->meta_access.default_schema);

Get(src, section_prefix, "samovar", "use_samovar", &config->samovar_config.turn_on_samovar);
Get(src, section_prefix, "samovar", "enable_setnx_coordinator", &config->samovar_config.enable_setnx_coordinator);

// SyncBackoff = MetadataBackoff by default, but params can be overrided
GetBackoffInfo(src, &config->samovar_config.metadata_backoff, section_prefix, "");
Expand Down Expand Up @@ -472,6 +473,7 @@ arrow::Status ReadValues(Source* src, Config* config, std::string_view section_p
&config->samovar_config.max_time_before_processing_ms);

Get(src, section_prefix, "samovar", "need_sync_on_init", &config->samovar_config.need_sync_on_init);
Get(src, section_prefix, "samovar", "sync_segments", &config->samovar_config.sync_segments);
Get(src, section_prefix, "samovar", "allow_static_balancing", &config->samovar_config.allow_static_balancing);

Get(src, section_prefix, "samovar", "first_slice_to_sleep", &config->samovar_config.first_slice_to_sleep);
Expand Down
2 changes: 2 additions & 0 deletions tea/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ struct BackoffInfo {

struct SamovarConfig {
bool turn_on_samovar = false;
bool enable_setnx_coordinator = false;

BackoffInfo metadata_backoff;
BackoffInfo sync_backoff;
Expand All @@ -165,6 +166,7 @@ struct SamovarConfig {
std::chrono::milliseconds max_time_before_processing_ms = std::chrono::milliseconds(0);

bool need_sync_on_init = true;
std::vector<int> sync_segments;
bool allow_static_balancing = true;

int32_t queue_push_batch_size = 1000;
Expand Down
52 changes: 30 additions & 22 deletions tea/gpext/tea_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,9 @@ std::shared_ptr<iceberg::TableMetadataV2> GetTableMetadataNonNull(TeaContextPtr
return table_metadata;
}

std::shared_ptr<tea::samovar::SingleQueueClient> SamovarMakePlan(TeaContextPtr tea_ctx,
tea::Reader::SerializedFilter filter,
std::string queue_name, int segment_id,
int segment_count) {
std::shared_ptr<tea::samovar::SingleQueueClient> SamovarMakePlan(
TeaContextPtr tea_ctx, tea::Reader::SerializedFilter filter, std::string queue_name, int segment_id,
int segment_count, std::shared_ptr<tea::samovar::SingleQueueClient> samovar_client) {
TEA_LOG("I am samovar coordinator");

tea::PlannerStats& stats = get::PlannerStats(tea_ctx);
Expand Down Expand Up @@ -956,8 +955,10 @@ std::shared_ptr<tea::samovar::SingleQueueClient> SamovarMakePlan(TeaContextPtr t
// SinglenodeMetadataParsing mode uses its own validation which is performed after applying min/max filters
ValidateFilesCountInDistributedMode(config.config, manifest_files_queue);

std::shared_ptr<tea::samovar::SingleQueueClient> samovar_client =
CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower);
if (!samovar_client) {
samovar_client =
CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower);
}
TEA_LOG("Samovar: filling manifests queue");
auto maybe_stats = tea::samovar::FillSamovarWithManifests(get::Config(tea_ctx), schema, manifest_files_queue,
segment_count, samovar_client);
Expand Down Expand Up @@ -1000,8 +1001,10 @@ std::shared_ptr<tea::samovar::SingleQueueClient> SamovarMakePlan(TeaContextPtr t

ValidateAllMetadata(get::Config(tea_ctx), all_meta);

std::shared_ptr<tea::samovar::SingleQueueClient> samovar_client =
CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower);
if (!samovar_client) {
samovar_client =
CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower);
}

tea::UpdatePlannerStats(stats, *metrics);

Expand Down Expand Up @@ -1054,13 +1057,9 @@ void TeaContextPlanExternal(TeaContextPtr tea_ctx, const ExternalScanParams* par

const bool from_samovar = get::SamovarConfig(tea_ctx).turn_on_samovar;
std::shared_ptr<tea::samovar::SingleQueueClient> samovar_client;
bool is_samovar_coordinator = false;
if (from_samovar) {
const std::string queue_name = make_samovar_queue_name();

const auto target_coordinator =
tea::samovar::GetCoordinator(get::SessionId(tea_ctx), get::Source(tea_ctx), params->segment_count);
const bool is_coordinator = params->segment_id == target_coordinator;

const auto& cfg = get::Config(tea_ctx).samovar_config;
const int slice_id = params->slice_id;
const int first_slice_to_sleep = cfg.first_slice_to_sleep;
Expand All @@ -1075,26 +1074,35 @@ void TeaContextPlanExternal(TeaContextPtr tea_ctx, const ExternalScanParams* par
}
}

if (is_coordinator) {
samovar_client =
SamovarMakePlan(tea_ctx, filter, make_samovar_queue_name(), params->segment_id, params->segment_count);
if (cfg.enable_setnx_coordinator) {
samovar_client = CreateSamovarClient(tea_ctx, queue_name, params->segment_id, params->segment_count,
tea::samovar::SamovarRole::kFollower);
is_samovar_coordinator = samovar_client->TryClaimCoordinator(params->segment_id);
TEA_LOG("Samovar coordinator is selected via SETNX. Current segment " + std::to_string(params->segment_id) +
(is_samovar_coordinator ? " won coordinator role" : " is follower"));
} else {
TEA_LOG("Samovar coordinator for query is " + std::to_string(target_coordinator));
const auto target_coordinator =
tea::samovar::GetCoordinator(get::SessionId(tea_ctx), get::Source(tea_ctx), params->segment_count);
is_samovar_coordinator = params->segment_id == target_coordinator;
if (!is_samovar_coordinator) {
TEA_LOG("Samovar coordinator for query is " + std::to_string(target_coordinator));
}
}

if (is_samovar_coordinator) {
samovar_client =
SamovarMakePlan(tea_ctx, filter, queue_name, params->segment_id, params->segment_count, samovar_client);
}
}

auto maybe_plan_meta = [&]() -> ResultType {
if (from_samovar) {
const std::string queue_name = make_samovar_queue_name();

const auto target_coordinator =
tea::samovar::GetCoordinator(get::SessionId(tea_ctx), get::Source(tea_ctx), params->segment_count);
const bool is_coordinator = params->segment_id == target_coordinator;

// Followers should wait for some time (at least 3x the average s3 request latency) since no progress is
// impossible until the coordinator writes the metadata to Samovar.
// The coordinator does not have to wait because the metadata has already been written by him.
if (!is_coordinator) {
if (!is_samovar_coordinator) {
const auto& config = get::Config(tea_ctx);
if (config.samovar_config.wait_before_processing) {
std::chrono::milliseconds sleep_time =
Expand Down
24 changes: 24 additions & 0 deletions tea/samovar/network_layer/redis_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ RedisClient::RedisClient(const std::vector<Endpoint>& endpoints, std::chrono::mi
if (!TryConnect(endpoints[i])) {
TEA_LOG("Non available redis host " + endpoints[i].host + ":" + std::to_string(endpoints[i].port));
} else {
connected_endpoint_index_ = i;
chosen_checkpoint_ = endpoints[i];
TEA_LOG("Redis to processing - " + endpoints[i].host + ":" + std::to_string(endpoints[i].port));
break;
Expand Down Expand Up @@ -115,6 +116,8 @@ int64_t RedisClient::GetRequestCount() const { return requests_count_; }

int64_t RedisClient::GetErrorCount() const { return error_count_; }

size_t RedisClient::GetConnectedEndpointIndex() const { return connected_endpoint_index_; }

void SamovarRedisClient::PushQueue(const std::string& queue_name, const std::vector<std::string>& elements) {
std::vector<std::string> args{"LPUSH", queue_name};
args.insert(args.end(), elements.begin(), elements.end());
Expand Down Expand Up @@ -164,6 +167,23 @@ void SamovarRedisClient::SetCell(const std::string& cell_name, const std::string
}
}

bool SamovarRedisClient::SetCellIfNotExists(const std::string& cell_name, const std::string& message,
std::chrono::seconds ttl) {
auto reply =
underground_client_->SendRequest({"SET", cell_name, message, "NX", "EX", std::to_string(ttl.count())}).Get();
if (ErrorOnMessage(reply)) {
throw std::runtime_error("Can not set cell if not exists " + cell_name + ": " +
underground_client_->GetErrorMessage());
}
if (reply->type == REDIS_REPLY_NIL) {
return false;
}
if (reply->type == REDIS_REPLY_STATUS) {
return true;
}
throw std::runtime_error("Can not set cell if not exists " + cell_name + ": unexpected response type");
}

std::optional<std::string> SamovarRedisClient::GetCell(const std::string& cell_name) {
auto reply = underground_client_->SendRequest({"GET", cell_name});
auto reply_repr = reply.Get();
Expand Down Expand Up @@ -300,4 +320,8 @@ bool SamovarRedisClient::ContainsInSet(const std::string& set_key, const std::st
return reply->integer;
}

size_t SamovarRedisClient::GetConnectedEndpointIndex() const {
return underground_client_->GetConnectedEndpointIndex();
}

} // namespace tea::samovar
4 changes: 4 additions & 0 deletions tea/samovar/network_layer/redis_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class RedisClient {
DurationTicks GetTotalResponseDurationTicks() const;
int64_t GetRequestCount() const;
int64_t GetErrorCount() const;
size_t GetConnectedEndpointIndex() const;

protected:
bool TryConnect(const Endpoint& endpoint);
Expand All @@ -54,6 +55,7 @@ class RedisClient {
DurationTicks sum_time_response_ = 0;
int64_t requests_count_ = 0;
int64_t error_count_ = 0;
size_t connected_endpoint_index_ = 0;
std::chrono::milliseconds request_timeout_;
std::chrono::milliseconds connection_timeout_;
};
Expand All @@ -67,6 +69,7 @@ class SamovarRedisClient : public ISamovarClient {
std::vector<std::string> PopQueue(const std::string& queue_name, int num_elements) override;

void SetCell(const std::string& cell_name, const std::string& message, std::chrono::seconds ttl) override;
bool SetCellIfNotExists(const std::string& cell_name, const std::string& message, std::chrono::seconds ttl) override;
std::optional<std::string> GetCell(const std::string& cell_name) override;

void SetNumericCell(const std::string& cell_name, int value, std::chrono::seconds ttl) override;
Expand All @@ -86,6 +89,7 @@ class SamovarRedisClient : public ISamovarClient {
void AddIntoSet(const std::string& set_key, const std::string& value) override;
void RemoveFromSet(const std::string& set_key, const std::string& value) override;
bool ContainsInSet(const std::string& set_key, const std::string& value) override;
size_t GetConnectedEndpointIndex() const;

/// Note: methods below are useful only for tests.
std::vector<std::string> GetAllKeys();
Expand Down
2 changes: 2 additions & 0 deletions tea/samovar/network_layer/samovar_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class ISamovarClient {
virtual std::vector<std::string> PopQueue(const std::string& queue_name, int num_elements) = 0;

virtual void SetCell(const std::string& cell_name, const std::string& message, std::chrono::seconds ttl) = 0;
virtual bool SetCellIfNotExists(const std::string& cell_name, const std::string& message,
std::chrono::seconds ttl) = 0;
virtual std::optional<std::string> GetCell(const std::string& cell_name) = 0;

virtual void SetNumericCell(const std::string& cell_name, int value, std::chrono::seconds ttl) = 0;
Expand Down
30 changes: 28 additions & 2 deletions tea/samovar/planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,39 @@

namespace tea::samovar {

namespace {
int GetSyncSegmentsOnInit(const SamovarConfig& config, int segment_count, size_t endpoint_index) {
if (!config.enable_setnx_coordinator) {
return segment_count;
}

if (config.sync_segments.size() != config.endpoints.size()) {
throw std::runtime_error("Samovar config error: sync_segments size must match number of endpoints");
}
if (endpoint_index >= config.sync_segments.size()) {
throw std::runtime_error("Samovar internal error: endpoint index is out of bounds for sync_segments");
}

const int sync_segments = config.sync_segments[endpoint_index];
if (sync_segments <= 0 || sync_segments > segment_count) {
throw std::runtime_error("Samovar config error: sync_segments value must be in [1, segment_count]");
}

return sync_segments;
}
} // namespace

std::shared_ptr<SingleQueueClient> MakeSamovarDataClient(const SamovarConfig& config, const std::string& queue_name,
int segment_id, int segment_count, SamovarRole role,
const CancelToken& cancel_token) {
auto sync_backoff = CreateBackoff(config.sync_backoff, cancel_token);
auto metadata_backoff = CreateBackoff(config.metadata_backoff, cancel_token);

std::shared_ptr<ISamovarClient> samovar_client =
auto redis_client =
std::make_shared<SamovarRedisClient>(config.endpoints, config.request_timeout, config.connection_timeout);
const int sync_segments_on_init =
GetSyncSegmentsOnInit(config, segment_count, redis_client->GetConnectedEndpointIndex());
std::shared_ptr<ISamovarClient> samovar_client = redis_client;
auto batch_size_scheduler = std::make_shared<ConstantBatchSizeScheduler>(config.batch_size);
auto batcher = std::make_shared<Batcher>(samovar_client, batch_size_scheduler);

Expand All @@ -63,7 +88,8 @@ std::shared_ptr<SingleQueueClient> MakeSamovarDataClient(const SamovarConfig& co
case BalancerType::kOneQueue: {
samovar_data_client_ = std::make_shared<SingleQueueClient>(
samovar_client, batcher, config.ttl_seconds, queue_name, segment_count, config.compressor_name, role,
sync_backoff, metadata_backoff, config.need_sync_on_init, config.queue_push_batch_size);
sync_backoff, metadata_backoff, config.need_sync_on_init, config.queue_push_batch_size,
sync_segments_on_init);
break;
}
default:
Expand Down
18 changes: 15 additions & 3 deletions tea/samovar/single_queue_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ SingleQueueClient::SingleQueueClient(std::shared_ptr<ISamovarClient> client, std
std::chrono::seconds ttl_seconds, const std::string& queue_id, int segment_count,
const std::string& compressor_name, SamovarRole role,
std::shared_ptr<IBackoff> sync_backoff, std::shared_ptr<IBackoff> metadata_backoff,
bool need_sync_on_init, uint32_t queue_push_batch_size)
bool need_sync_on_init, uint32_t queue_push_batch_size, int sync_segments_on_init)
: client_(client),
batcher_(batcher),
ttl_seconds_(ttl_seconds),
Expand All @@ -44,6 +44,7 @@ SingleQueueClient::SingleQueueClient(std::shared_ptr<ISamovarClient> client, std
role_(role),
metadata_backoff_(metadata_backoff),
need_sync_on_init_(need_sync_on_init),
sync_segments_on_init_(sync_segments_on_init),
sync_backoff_(sync_backoff),
segment_count_(segment_count),
queue_push_batch_size_(queue_push_batch_size) {
Expand Down Expand Up @@ -117,7 +118,7 @@ const samovar::ScanMetadata& SingleQueueClient::GetPlannedMetadata() {
if (role_ == SamovarRole::kFollower && need_sync_on_init_) {
ScopedTimerTicks timer(total_sync_time_);

SyncSegments(client_, GetInitScanCell(), segment_count_, sync_backoff_, "sync_segments");
SyncSegments(client_, GetInitScanCell(), sync_segments_on_init_, sync_backoff_, "sync_segments");
}

samovar::ScanMetadata result_metadata;
Expand Down Expand Up @@ -196,6 +197,13 @@ std::string SingleQueueClient::GetMetadataCell() {
return *metadata_cell_;
}

std::string SingleQueueClient::GetCoordinatorCell() {
if (!coordinator_cell_) {
coordinator_cell_ = coordinator_prefix + queue_id_;
}
return *coordinator_cell_;
}

std::string SingleQueueClient::GetManifestsSyncScanCell() { return manifest_sync_prefix + queue_id_; }
std::string SingleQueueClient::GetManifestCell() { return manifest_queue_prefix + queue_id_; }

Expand Down Expand Up @@ -225,6 +233,10 @@ int64_t SingleQueueClient::GetMetricValue(SamovarMetrics metric) const {
}
}

bool SingleQueueClient::TryClaimCoordinator(int segment_id) {
return client_->SetCellIfNotExists(GetCoordinatorCell(), std::to_string(segment_id), ttl_seconds_);
}

SingleQueueClient::~SingleQueueClient() {
if (role_ == SamovarRole::kFollower) {
try {
Expand All @@ -236,7 +248,7 @@ SingleQueueClient::~SingleQueueClient() {
}

std::vector<std::string> SingleQueueClient::AllCells() {
return {queue_id_, GetMetadataCell(), GetInitScanCell(), GetManifestsSyncScanCell(),
return {queue_id_, GetMetadataCell(), GetCoordinatorCell(), GetInitScanCell(), GetManifestsSyncScanCell(),
GetCheckpointCell(), GetFileListCell(), GetManifestCell()};
}

Expand Down
7 changes: 6 additions & 1 deletion tea/samovar/single_queue_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class SingleQueueClient {
std::chrono::seconds ttl_seconds, const std::string& queue_id, int segment_count,
const std::string& compressor_name, SamovarRole role,
std::shared_ptr<IBackoff> sync_backoff, std::shared_ptr<IBackoff> metadata_backoff,
bool need_sync_on_init, uint32_t queue_push_batch_size);
bool need_sync_on_init, uint32_t queue_push_batch_size, int sync_segments_on_init);

std::optional<samovar::AnnotatedDataEntry> GetNextDataEntry();
std::optional<samovar::ManifestList> GetNextManifest();
Expand All @@ -50,6 +50,7 @@ class SingleQueueClient {
std::string GetQueueId() const { return queue_id_; }

int64_t GetMetricValue(SamovarMetrics metric) const;
bool TryClaimCoordinator(int segment_id);

~SingleQueueClient();

Expand All @@ -71,6 +72,7 @@ class SingleQueueClient {
std::string queue_id_;

static constexpr const char* metadata_prefix = "/samovar_meta";
static constexpr const char* coordinator_prefix = "/coordinator";
static constexpr const char* file_list_prefix = "/file_list";
static constexpr const char* init_scan_prefix = "/init_scan";
static constexpr const char* checkpoint_prefix = "/checkpoint";
Expand All @@ -81,13 +83,15 @@ class SingleQueueClient {
std::optional<std::string> checkpoint_cell_;
std::optional<std::string> metadata_cell_;
std::optional<std::string> file_list_cell_;
std::optional<std::string> coordinator_cell_;

std::optional<samovar::ScanMetadata> cached_result_metadata;
std::optional<samovar::FileList> file_list;

std::string GetInitScanCell();
std::string GetCheckpointCell();
std::string GetMetadataCell();
std::string GetCoordinatorCell();
std::string GetFileListCell();
std::string GetManifestCell();
std::string GetManifestsSyncScanCell();
Expand All @@ -98,6 +102,7 @@ class SingleQueueClient {
std::shared_ptr<IBackoff> metadata_backoff_;

bool need_sync_on_init_ = false;
int sync_segments_on_init_ = 1;
std::shared_ptr<IBackoff> sync_backoff_;
DurationTicks total_sync_time_ = 0;

Expand Down
Loading
Loading