Skip to content

Commit ad7a6f1

Browse files
committed
fix: per-context bloom filter metadata to avoid race conditions
- Store expected_elements and num_hashes in per-context maps instead of globals - Update set_local_bloom_bits to write to per-context maps - Update getters to take context_id parameter - clear_bloom_filter now also erases local_bloom_bits and metadata maps - Update BloomFilterBits handler to use per-context getters
1 parent a73926b commit ad7a6f1

2 files changed

Lines changed: 21 additions & 10 deletions

File tree

include/common/cluster_manager.hpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,8 @@ class ClusterManager {
286286
size_t expected_elements, size_t num_hashes) {
287287
const std::scoped_lock<std::mutex> lock(mutex_);
288288
local_bloom_bits_[context_id] = std::move(bits);
289-
local_expected_elements_ = expected_elements;
290-
local_num_hashes_ = num_hashes;
289+
local_expected_elements_map_[context_id] = expected_elements;
290+
local_num_hashes_map_[context_id] = num_hashes;
291291
}
292292

293293
/**
@@ -305,17 +305,25 @@ class ClusterManager {
305305
/**
306306
* @brief Get expected_elements for local bloom filter
307307
*/
308-
[[nodiscard]] size_t get_local_expected_elements() const {
308+
[[nodiscard]] size_t get_local_expected_elements(const std::string& context_id) const {
309309
const std::scoped_lock<std::mutex> lock(mutex_);
310-
return local_expected_elements_;
310+
auto it = local_expected_elements_map_.find(context_id);
311+
if (it != local_expected_elements_map_.end()) {
312+
return it->second;
313+
}
314+
return 0;
311315
}
312316

313317
/**
314318
* @brief Get num_hashes for local bloom filter
315319
*/
316-
[[nodiscard]] size_t get_local_num_hashes() const {
320+
[[nodiscard]] size_t get_local_num_hashes(const std::string& context_id) const {
317321
const std::scoped_lock<std::mutex> lock(mutex_);
318-
return local_num_hashes_;
322+
auto it = local_num_hashes_map_.find(context_id);
323+
if (it != local_num_hashes_map_.end()) {
324+
return it->second;
325+
}
326+
return 0;
319327
}
320328

321329
/**
@@ -324,6 +332,9 @@ class ClusterManager {
324332
void clear_bloom_filter(const std::string& context_id) {
325333
const std::scoped_lock<std::mutex> lock(mutex_);
326334
bloom_filters_.erase(context_id);
335+
local_bloom_bits_.erase(context_id);
336+
local_expected_elements_map_.erase(context_id);
337+
local_num_hashes_map_.erase(context_id);
327338
}
328339

329340
private:
@@ -352,8 +363,8 @@ class ClusterManager {
352363
std::unordered_map<std::string, BloomFilterEntry> bloom_filters_;
353364
/* context_id -> local bloom filter bits (for aggregation during distributed build) */
354365
std::unordered_map<std::string, std::vector<uint8_t>> local_bloom_bits_;
355-
size_t local_expected_elements_ = 0;
356-
size_t local_num_hashes_ = 0;
366+
std::unordered_map<std::string, size_t> local_expected_elements_map_;
367+
std::unordered_map<std::string, size_t> local_num_hashes_map_;
357368
mutable std::mutex mutex_;
358369
};
359370

src/main.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,8 @@ int main(int argc, char* argv[]) {
529529
reply_args.filter_data =
530530
cluster_manager->get_local_bloom_bits(args.context_id);
531531
reply_args.expected_elements =
532-
cluster_manager->get_local_expected_elements();
533-
reply_args.num_hashes = cluster_manager->get_local_num_hashes();
532+
cluster_manager->get_local_expected_elements(args.context_id);
533+
reply_args.num_hashes = cluster_manager->get_local_num_hashes(args.context_id);
534534

535535
auto resp_p = reply_args.serialize();
536536
cloudsql::network::RpcHeader resp_h;

0 commit comments

Comments
 (0)