From 13de2b4a4883af6c4a6cf3460b465d936eb6591c Mon Sep 17 00:00:00 2001 From: German Perov Date: Wed, 10 Jun 2026 15:46:17 +0300 Subject: [PATCH] feat(samovar): fastest segment is coordinator --- tea/common/config.cpp | 2 + tea/common/config.h | 2 + tea/gpext/tea_reader.cpp | 52 +++++++++++++--------- tea/samovar/network_layer/redis_client.cpp | 24 ++++++++++ tea/samovar/network_layer/redis_client.h | 4 ++ tea/samovar/network_layer/samovar_client.h | 2 + tea/samovar/planner.cpp | 30 ++++++++++++- tea/samovar/single_queue_client.cpp | 18 ++++++-- tea/samovar/single_queue_client.h | 7 ++- tea/samovar/ut/redis_test.cpp | 6 +-- test/config/tea-config-schema.json | 7 ++- 11 files changed, 122 insertions(+), 32 deletions(-) diff --git a/tea/common/config.cpp b/tea/common/config.cpp index 6b06720d..898fe794 100644 --- a/tea/common/config.cpp +++ b/tea/common/config.cpp @@ -441,6 +441,7 @@ arrow::Status ReadValues(Source* src, Config* config, std::string_view section_p Get(src, section_prefix, "metadata_access", "default_schema", &config->meta_access.default_schema); Get(src, section_prefix, "samovar", "use_samovar", &config->samovar_config.turn_on_samovar); + Get(src, section_prefix, "samovar", "enable_setnx_coordinator", &config->samovar_config.enable_setnx_coordinator); // SyncBackoff = MetadataBackoff by default, but params can be overrided GetBackoffInfo(src, &config->samovar_config.metadata_backoff, section_prefix, ""); @@ -472,6 +473,7 @@ arrow::Status ReadValues(Source* src, Config* config, std::string_view section_p &config->samovar_config.max_time_before_processing_ms); Get(src, section_prefix, "samovar", "need_sync_on_init", &config->samovar_config.need_sync_on_init); + Get(src, section_prefix, "samovar", "sync_segments", &config->samovar_config.sync_segments); Get(src, section_prefix, "samovar", "allow_static_balancing", &config->samovar_config.allow_static_balancing); Get(src, section_prefix, "samovar", "first_slice_to_sleep", &config->samovar_config.first_slice_to_sleep); diff --git a/tea/common/config.h b/tea/common/config.h index 429375a7..427779a1 100644 --- a/tea/common/config.h +++ b/tea/common/config.h @@ -143,6 +143,7 @@ struct BackoffInfo { struct SamovarConfig { bool turn_on_samovar = false; + bool enable_setnx_coordinator = false; BackoffInfo metadata_backoff; BackoffInfo sync_backoff; @@ -165,6 +166,7 @@ struct SamovarConfig { std::chrono::milliseconds max_time_before_processing_ms = std::chrono::milliseconds(0); bool need_sync_on_init = true; + std::vector sync_segments; bool allow_static_balancing = true; int32_t queue_push_batch_size = 1000; diff --git a/tea/gpext/tea_reader.cpp b/tea/gpext/tea_reader.cpp index 4f2b37ed..da66a069 100644 --- a/tea/gpext/tea_reader.cpp +++ b/tea/gpext/tea_reader.cpp @@ -921,10 +921,9 @@ std::shared_ptr GetTableMetadataNonNull(TeaContextPtr return table_metadata; } -std::shared_ptr SamovarMakePlan(TeaContextPtr tea_ctx, - tea::Reader::SerializedFilter filter, - std::string queue_name, int segment_id, - int segment_count) { +std::shared_ptr SamovarMakePlan( + TeaContextPtr tea_ctx, tea::Reader::SerializedFilter filter, std::string queue_name, int segment_id, + int segment_count, std::shared_ptr samovar_client) { TEA_LOG("I am samovar coordinator"); tea::PlannerStats& stats = get::PlannerStats(tea_ctx); @@ -956,8 +955,10 @@ std::shared_ptr SamovarMakePlan(TeaContextPtr t // SinglenodeMetadataParsing mode uses its own validation which is performed after applying min/max filters ValidateFilesCountInDistributedMode(config.config, manifest_files_queue); - std::shared_ptr samovar_client = - CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower); + if (!samovar_client) { + samovar_client = + CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower); + } TEA_LOG("Samovar: filling manifests queue"); auto maybe_stats = tea::samovar::FillSamovarWithManifests(get::Config(tea_ctx), schema, manifest_files_queue, segment_count, samovar_client); @@ -1000,8 +1001,10 @@ std::shared_ptr SamovarMakePlan(TeaContextPtr t ValidateAllMetadata(get::Config(tea_ctx), all_meta); - std::shared_ptr samovar_client = - CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower); + if (!samovar_client) { + samovar_client = + CreateSamovarClient(tea_ctx, queue_name, segment_id, segment_count, tea::samovar::SamovarRole::kFollower); + } tea::UpdatePlannerStats(stats, *metrics); @@ -1054,13 +1057,9 @@ void TeaContextPlanExternal(TeaContextPtr tea_ctx, const ExternalScanParams* par const bool from_samovar = get::SamovarConfig(tea_ctx).turn_on_samovar; std::shared_ptr samovar_client; + bool is_samovar_coordinator = false; if (from_samovar) { const std::string queue_name = make_samovar_queue_name(); - - const auto target_coordinator = - tea::samovar::GetCoordinator(get::SessionId(tea_ctx), get::Source(tea_ctx), params->segment_count); - const bool is_coordinator = params->segment_id == target_coordinator; - const auto& cfg = get::Config(tea_ctx).samovar_config; const int slice_id = params->slice_id; const int first_slice_to_sleep = cfg.first_slice_to_sleep; @@ -1075,11 +1074,24 @@ void TeaContextPlanExternal(TeaContextPtr tea_ctx, const ExternalScanParams* par } } - if (is_coordinator) { - samovar_client = - SamovarMakePlan(tea_ctx, filter, make_samovar_queue_name(), params->segment_id, params->segment_count); + if (cfg.enable_setnx_coordinator) { + samovar_client = CreateSamovarClient(tea_ctx, queue_name, params->segment_id, params->segment_count, + tea::samovar::SamovarRole::kFollower); + is_samovar_coordinator = samovar_client->TryClaimCoordinator(params->segment_id); + TEA_LOG("Samovar coordinator is selected via SETNX. Current segment " + std::to_string(params->segment_id) + + (is_samovar_coordinator ? " won coordinator role" : " is follower")); } else { - TEA_LOG("Samovar coordinator for query is " + std::to_string(target_coordinator)); + const auto target_coordinator = + tea::samovar::GetCoordinator(get::SessionId(tea_ctx), get::Source(tea_ctx), params->segment_count); + is_samovar_coordinator = params->segment_id == target_coordinator; + if (!is_samovar_coordinator) { + TEA_LOG("Samovar coordinator for query is " + std::to_string(target_coordinator)); + } + } + + if (is_samovar_coordinator) { + samovar_client = + SamovarMakePlan(tea_ctx, filter, queue_name, params->segment_id, params->segment_count, samovar_client); } } @@ -1087,14 +1099,10 @@ void TeaContextPlanExternal(TeaContextPtr tea_ctx, const ExternalScanParams* par if (from_samovar) { const std::string queue_name = make_samovar_queue_name(); - const auto target_coordinator = - tea::samovar::GetCoordinator(get::SessionId(tea_ctx), get::Source(tea_ctx), params->segment_count); - const bool is_coordinator = params->segment_id == target_coordinator; - // Followers should wait for some time (at least 3x the average s3 request latency) since no progress is // impossible until the coordinator writes the metadata to Samovar. // The coordinator does not have to wait because the metadata has already been written by him. - if (!is_coordinator) { + if (!is_samovar_coordinator) { const auto& config = get::Config(tea_ctx); if (config.samovar_config.wait_before_processing) { std::chrono::milliseconds sleep_time = diff --git a/tea/samovar/network_layer/redis_client.cpp b/tea/samovar/network_layer/redis_client.cpp index 1544b1fb..b65c7ff9 100644 --- a/tea/samovar/network_layer/redis_client.cpp +++ b/tea/samovar/network_layer/redis_client.cpp @@ -51,6 +51,7 @@ RedisClient::RedisClient(const std::vector& endpoints, std::chrono::mi if (!TryConnect(endpoints[i])) { TEA_LOG("Non available redis host " + endpoints[i].host + ":" + std::to_string(endpoints[i].port)); } else { + connected_endpoint_index_ = i; chosen_checkpoint_ = endpoints[i]; TEA_LOG("Redis to processing - " + endpoints[i].host + ":" + std::to_string(endpoints[i].port)); break; @@ -115,6 +116,8 @@ int64_t RedisClient::GetRequestCount() const { return requests_count_; } int64_t RedisClient::GetErrorCount() const { return error_count_; } +size_t RedisClient::GetConnectedEndpointIndex() const { return connected_endpoint_index_; } + void SamovarRedisClient::PushQueue(const std::string& queue_name, const std::vector& elements) { std::vector args{"LPUSH", queue_name}; args.insert(args.end(), elements.begin(), elements.end()); @@ -164,6 +167,23 @@ void SamovarRedisClient::SetCell(const std::string& cell_name, const std::string } } +bool SamovarRedisClient::SetCellIfNotExists(const std::string& cell_name, const std::string& message, + std::chrono::seconds ttl) { + auto reply = + underground_client_->SendRequest({"SET", cell_name, message, "NX", "EX", std::to_string(ttl.count())}).Get(); + if (ErrorOnMessage(reply)) { + throw std::runtime_error("Can not set cell if not exists " + cell_name + ": " + + underground_client_->GetErrorMessage()); + } + if (reply->type == REDIS_REPLY_NIL) { + return false; + } + if (reply->type == REDIS_REPLY_STATUS) { + return true; + } + throw std::runtime_error("Can not set cell if not exists " + cell_name + ": unexpected response type"); +} + std::optional SamovarRedisClient::GetCell(const std::string& cell_name) { auto reply = underground_client_->SendRequest({"GET", cell_name}); auto reply_repr = reply.Get(); @@ -300,4 +320,8 @@ bool SamovarRedisClient::ContainsInSet(const std::string& set_key, const std::st return reply->integer; } +size_t SamovarRedisClient::GetConnectedEndpointIndex() const { + return underground_client_->GetConnectedEndpointIndex(); +} + } // namespace tea::samovar diff --git a/tea/samovar/network_layer/redis_client.h b/tea/samovar/network_layer/redis_client.h index 55fa7a86..aa85535b 100644 --- a/tea/samovar/network_layer/redis_client.h +++ b/tea/samovar/network_layer/redis_client.h @@ -45,6 +45,7 @@ class RedisClient { DurationTicks GetTotalResponseDurationTicks() const; int64_t GetRequestCount() const; int64_t GetErrorCount() const; + size_t GetConnectedEndpointIndex() const; protected: bool TryConnect(const Endpoint& endpoint); @@ -54,6 +55,7 @@ class RedisClient { DurationTicks sum_time_response_ = 0; int64_t requests_count_ = 0; int64_t error_count_ = 0; + size_t connected_endpoint_index_ = 0; std::chrono::milliseconds request_timeout_; std::chrono::milliseconds connection_timeout_; }; @@ -67,6 +69,7 @@ class SamovarRedisClient : public ISamovarClient { std::vector PopQueue(const std::string& queue_name, int num_elements) override; void SetCell(const std::string& cell_name, const std::string& message, std::chrono::seconds ttl) override; + bool SetCellIfNotExists(const std::string& cell_name, const std::string& message, std::chrono::seconds ttl) override; std::optional GetCell(const std::string& cell_name) override; void SetNumericCell(const std::string& cell_name, int value, std::chrono::seconds ttl) override; @@ -86,6 +89,7 @@ class SamovarRedisClient : public ISamovarClient { void AddIntoSet(const std::string& set_key, const std::string& value) override; void RemoveFromSet(const std::string& set_key, const std::string& value) override; bool ContainsInSet(const std::string& set_key, const std::string& value) override; + size_t GetConnectedEndpointIndex() const; /// Note: methods below are useful only for tests. std::vector GetAllKeys(); diff --git a/tea/samovar/network_layer/samovar_client.h b/tea/samovar/network_layer/samovar_client.h index 61b6d744..546c9f51 100644 --- a/tea/samovar/network_layer/samovar_client.h +++ b/tea/samovar/network_layer/samovar_client.h @@ -41,6 +41,8 @@ class ISamovarClient { virtual std::vector PopQueue(const std::string& queue_name, int num_elements) = 0; virtual void SetCell(const std::string& cell_name, const std::string& message, std::chrono::seconds ttl) = 0; + virtual bool SetCellIfNotExists(const std::string& cell_name, const std::string& message, + std::chrono::seconds ttl) = 0; virtual std::optional GetCell(const std::string& cell_name) = 0; virtual void SetNumericCell(const std::string& cell_name, int value, std::chrono::seconds ttl) = 0; diff --git a/tea/samovar/planner.cpp b/tea/samovar/planner.cpp index 3dccbe71..db8d7f99 100644 --- a/tea/samovar/planner.cpp +++ b/tea/samovar/planner.cpp @@ -46,14 +46,39 @@ namespace tea::samovar { +namespace { +int GetSyncSegmentsOnInit(const SamovarConfig& config, int segment_count, size_t endpoint_index) { + if (!config.enable_setnx_coordinator) { + return segment_count; + } + + if (config.sync_segments.size() != config.endpoints.size()) { + throw std::runtime_error("Samovar config error: sync_segments size must match number of endpoints"); + } + if (endpoint_index >= config.sync_segments.size()) { + throw std::runtime_error("Samovar internal error: endpoint index is out of bounds for sync_segments"); + } + + const int sync_segments = config.sync_segments[endpoint_index]; + if (sync_segments <= 0 || sync_segments > segment_count) { + throw std::runtime_error("Samovar config error: sync_segments value must be in [1, segment_count]"); + } + + return sync_segments; +} +} // namespace + std::shared_ptr MakeSamovarDataClient(const SamovarConfig& config, const std::string& queue_name, int segment_id, int segment_count, SamovarRole role, const CancelToken& cancel_token) { auto sync_backoff = CreateBackoff(config.sync_backoff, cancel_token); auto metadata_backoff = CreateBackoff(config.metadata_backoff, cancel_token); - std::shared_ptr samovar_client = + auto redis_client = std::make_shared(config.endpoints, config.request_timeout, config.connection_timeout); + const int sync_segments_on_init = + GetSyncSegmentsOnInit(config, segment_count, redis_client->GetConnectedEndpointIndex()); + std::shared_ptr samovar_client = redis_client; auto batch_size_scheduler = std::make_shared(config.batch_size); auto batcher = std::make_shared(samovar_client, batch_size_scheduler); @@ -63,7 +88,8 @@ std::shared_ptr MakeSamovarDataClient(const SamovarConfig& co case BalancerType::kOneQueue: { samovar_data_client_ = std::make_shared( samovar_client, batcher, config.ttl_seconds, queue_name, segment_count, config.compressor_name, role, - sync_backoff, metadata_backoff, config.need_sync_on_init, config.queue_push_batch_size); + sync_backoff, metadata_backoff, config.need_sync_on_init, config.queue_push_batch_size, + sync_segments_on_init); break; } default: diff --git a/tea/samovar/single_queue_client.cpp b/tea/samovar/single_queue_client.cpp index cd135665..df0e5530 100644 --- a/tea/samovar/single_queue_client.cpp +++ b/tea/samovar/single_queue_client.cpp @@ -35,7 +35,7 @@ SingleQueueClient::SingleQueueClient(std::shared_ptr client, std std::chrono::seconds ttl_seconds, const std::string& queue_id, int segment_count, const std::string& compressor_name, SamovarRole role, std::shared_ptr sync_backoff, std::shared_ptr metadata_backoff, - bool need_sync_on_init, uint32_t queue_push_batch_size) + bool need_sync_on_init, uint32_t queue_push_batch_size, int sync_segments_on_init) : client_(client), batcher_(batcher), ttl_seconds_(ttl_seconds), @@ -44,6 +44,7 @@ SingleQueueClient::SingleQueueClient(std::shared_ptr client, std role_(role), metadata_backoff_(metadata_backoff), need_sync_on_init_(need_sync_on_init), + sync_segments_on_init_(sync_segments_on_init), sync_backoff_(sync_backoff), segment_count_(segment_count), queue_push_batch_size_(queue_push_batch_size) { @@ -117,7 +118,7 @@ const samovar::ScanMetadata& SingleQueueClient::GetPlannedMetadata() { if (role_ == SamovarRole::kFollower && need_sync_on_init_) { ScopedTimerTicks timer(total_sync_time_); - SyncSegments(client_, GetInitScanCell(), segment_count_, sync_backoff_, "sync_segments"); + SyncSegments(client_, GetInitScanCell(), sync_segments_on_init_, sync_backoff_, "sync_segments"); } samovar::ScanMetadata result_metadata; @@ -196,6 +197,13 @@ std::string SingleQueueClient::GetMetadataCell() { return *metadata_cell_; } +std::string SingleQueueClient::GetCoordinatorCell() { + if (!coordinator_cell_) { + coordinator_cell_ = coordinator_prefix + queue_id_; + } + return *coordinator_cell_; +} + std::string SingleQueueClient::GetManifestsSyncScanCell() { return manifest_sync_prefix + queue_id_; } std::string SingleQueueClient::GetManifestCell() { return manifest_queue_prefix + queue_id_; } @@ -225,6 +233,10 @@ int64_t SingleQueueClient::GetMetricValue(SamovarMetrics metric) const { } } +bool SingleQueueClient::TryClaimCoordinator(int segment_id) { + return client_->SetCellIfNotExists(GetCoordinatorCell(), std::to_string(segment_id), ttl_seconds_); +} + SingleQueueClient::~SingleQueueClient() { if (role_ == SamovarRole::kFollower) { try { @@ -236,7 +248,7 @@ SingleQueueClient::~SingleQueueClient() { } std::vector SingleQueueClient::AllCells() { - return {queue_id_, GetMetadataCell(), GetInitScanCell(), GetManifestsSyncScanCell(), + return {queue_id_, GetMetadataCell(), GetCoordinatorCell(), GetInitScanCell(), GetManifestsSyncScanCell(), GetCheckpointCell(), GetFileListCell(), GetManifestCell()}; } diff --git a/tea/samovar/single_queue_client.h b/tea/samovar/single_queue_client.h index ef9164fb..9d9165f1 100644 --- a/tea/samovar/single_queue_client.h +++ b/tea/samovar/single_queue_client.h @@ -31,7 +31,7 @@ class SingleQueueClient { std::chrono::seconds ttl_seconds, const std::string& queue_id, int segment_count, const std::string& compressor_name, SamovarRole role, std::shared_ptr sync_backoff, std::shared_ptr metadata_backoff, - bool need_sync_on_init, uint32_t queue_push_batch_size); + bool need_sync_on_init, uint32_t queue_push_batch_size, int sync_segments_on_init); std::optional GetNextDataEntry(); std::optional GetNextManifest(); @@ -50,6 +50,7 @@ class SingleQueueClient { std::string GetQueueId() const { return queue_id_; } int64_t GetMetricValue(SamovarMetrics metric) const; + bool TryClaimCoordinator(int segment_id); ~SingleQueueClient(); @@ -71,6 +72,7 @@ class SingleQueueClient { std::string queue_id_; static constexpr const char* metadata_prefix = "/samovar_meta"; + static constexpr const char* coordinator_prefix = "/coordinator"; static constexpr const char* file_list_prefix = "/file_list"; static constexpr const char* init_scan_prefix = "/init_scan"; static constexpr const char* checkpoint_prefix = "/checkpoint"; @@ -81,6 +83,7 @@ class SingleQueueClient { std::optional checkpoint_cell_; std::optional metadata_cell_; std::optional file_list_cell_; + std::optional coordinator_cell_; std::optional cached_result_metadata; std::optional file_list; @@ -88,6 +91,7 @@ class SingleQueueClient { std::string GetInitScanCell(); std::string GetCheckpointCell(); std::string GetMetadataCell(); + std::string GetCoordinatorCell(); std::string GetFileListCell(); std::string GetManifestCell(); std::string GetManifestsSyncScanCell(); @@ -98,6 +102,7 @@ class SingleQueueClient { std::shared_ptr metadata_backoff_; bool need_sync_on_init_ = false; + int sync_segments_on_init_ = 1; std::shared_ptr sync_backoff_; DurationTicks total_sync_time_ = 0; diff --git a/tea/samovar/ut/redis_test.cpp b/tea/samovar/ut/redis_test.cpp index e79ef5e7..e0390478 100644 --- a/tea/samovar/ut/redis_test.cpp +++ b/tea/samovar/ut/redis_test.cpp @@ -176,7 +176,7 @@ TEST(RedisClient, Test1) { auto batcher = std::make_shared(redis_client, batch_size_scheduler); auto client = SingleQueueClient(redis_client, batcher, std::chrono::seconds(std::numeric_limits::max()), GetQueueName(), 1, std::string(compression::kIdentityCompressorName), - SamovarRole::kCoordinator, backoff, backoff, true, 1); + SamovarRole::kCoordinator, backoff, backoff, true, 1, 1); client.FillFilesQueue({}, {}, {}); EXPECT_FALSE(client.GetNextDataEntry()); @@ -209,7 +209,7 @@ TEST(RedisClient, MultiThreading) { auto client = SingleQueueClient(redis_client, batcher, std::chrono::seconds(std::numeric_limits::max()), GetQueueName(test_iter), num_segments, std::string(compression::kIdentityCompressorName), - SamovarRole::kCoordinator, backoff, backoff, true, 1); + SamovarRole::kCoordinator, backoff, backoff, true, 1, num_segments); if (segment_id == 0) { samovar::ScanMetadata scan_metadata; @@ -369,7 +369,7 @@ TEST(RedisClient, FailServer) { client = std::make_shared( redis_client, batcher, std::chrono::seconds(std::numeric_limits::max()), GetQueueName(), num_segments, std::string(compression::kIdentityCompressorName), SamovarRole::kCoordinator, backoff, - backoff, true, 1); + backoff, true, 1, num_segments); } catch (const std::runtime_error& ex) { std::lock_guard lock(kill_mutex); EXPECT_TRUE(was_killed); diff --git a/test/config/tea-config-schema.json b/test/config/tea-config-schema.json index 692a5b01..958b218b 100644 --- a/test/config/tea-config-schema.json +++ b/test/config/tea-config-schema.json @@ -79,13 +79,18 @@ "type": "object", "properties": { "use_samovar": { "type": "boolean" }, + "enable_setnx_coordinator": { "type": "boolean" }, "backoff_type": { "type": "string", "enum": ["exp", "linear", "none"] }, "exponentail_backoff_sleep_coef": { "type": "number", "minimum": 0 }, "exponentail_backoff_limit": { "type": "integer", "minimum": 0 }, "limit_retries": { "type": "integer", "minimum": 0 }, "samovar": { "type": "string", "format": "hostname" }, "split_type": { "type": "string", "enum": ["offsets", "ranges", "hash"] }, - "ttl": { "type": "integer", "minimum": 0 } + "ttl": { "type": "integer", "minimum": 0 }, + "sync_segments": { + "type": "array", + "items": { "type": "integer", "minimum": 1 } + } }, "required": [ "use_samovar",