diff --git a/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index ce4e60648a8..96775020bee 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -228,6 +228,10 @@ struct SSTReaderInterfaces { struct CloudStorageEngineInterfaces { bool (*fn_get_keyspace_encryption)(RaftStoreProxyPtr, uint32_t); RawCppStringPtr (*fn_get_master_key)(RaftStoreProxyPtr); + RustStrWithViewVec (*fn_get_region_bucket_keys)(uint64_t, uint64_t, + RaftStoreProxyPtr); + void (*fn_clear_shared_snap_access_by_start_ts)(uint64_t, + RaftStoreProxyPtr); ColumnarReaderPtr (*fn_get_columnar_reader)(uint64_t, uint64_t, uint64_t, BaseBuffView, BaseBuffView, BaseBuffView, BaseBuffView, diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs index dbbcb525000..dc924d8948c 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs @@ -16,7 +16,10 @@ use std::{ fs, ops::Deref, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, Weak, + }, time::{Duration, UNIX_EPOCH}, }; @@ -48,7 +51,7 @@ use kvproto::{ coprocessor::DelegateResponse, metapb::{Peer, Store}, }; -use pd_client::PdClient; +use pd_client::{BucketStat, PdClient}; use protobuf::Message; use quick_cache::{ sync::{Cache, DefaultLifecycle}, @@ -114,11 +117,27 @@ pub struct CloudEngineBackends { pub txn_chunk_mgr: TxnChunkManager, } +#[derive(Clone)] +struct RegionBucketCacheEntry { + region_ver: u64, + keys: Vec>, +} + +impl From<&BucketStat> for RegionBucketCacheEntry { + fn from(bucket_stat: &BucketStat) -> Self { + Self { + region_ver: bucket_stat.meta.region_epoch.get_version(), + keys: bucket_stat.meta.keys.clone(), + } + } +} + #[derive(Clone)] pub struct PdClientWithCache { pd_client: Arc, store_cache: Arc>, // store_id -> Store region_cache: Arc>, // region_id -> Peer + region_bucket_cache: Arc>, // region_id -> bucket keys } impl PdClientWithCache { @@ -127,6 +146,7 @@ impl PdClientWithCache { pd_client, store_cache: Arc::new(DashMap::new()), region_cache: Arc::new(DashMap::new()), + region_bucket_cache: Arc::new(DashMap::new()), } } @@ -171,11 +191,37 @@ impl PdClientWithCache { pub fn evict_region_cache(&self, region_id: u64) { self.region_cache.remove(®ion_id); + self.region_bucket_cache.remove(®ion_id); } pub fn get_security_mgr(&self) -> Arc { self.pd_client.get_security_mgr() } + + pub fn get_region_bucket_keys(&self, region_id: u64, region_ver: u64) -> Vec> { + if let Some(bucket_entry) = self.region_bucket_cache.get(®ion_id) { + match bucket_entry.region_ver.cmp(®ion_ver) { + std::cmp::Ordering::Equal => return bucket_entry.keys.clone(), + std::cmp::Ordering::Greater => return Vec::new(), + std::cmp::Ordering::Less => {} + } + } + + let Ok(Some(bucket_stat)) = + futures::executor::block_on(self.pd_client.get_buckets_async(region_id)) + else { + self.region_bucket_cache.remove(®ion_id); + return Vec::new(); + }; + let bucket_entry = RegionBucketCacheEntry::from(&bucket_stat); + let bucket_keys = if bucket_entry.region_ver == region_ver { + bucket_entry.keys.clone() + } else { + Vec::new() + }; + self.region_bucket_cache.insert(region_id, bucket_entry); + bucket_keys + } } #[derive(Clone)] @@ -191,6 +237,7 @@ pub struct CloudHelper { block_cache: BlockCache, snapshot_cache: SnapCache, snapshot_cache_capable_stores: Arc>, + shared_snap_access_cache: SharedSnapAccessCache, meta_file_cache: Arc, MetaFileCacheWeighter>>, schema_files: Arc>, runtime: Arc, @@ -238,6 +285,7 @@ impl CloudHelper { ); let snapshot_cache = SnapCache::new(SNAPSHOT_CACHE_SIZE, SNAPSHOT_CACHE_CAP); let snapshot_cache_capable_stores = Arc::new(DashMap::new()); + let shared_snap_access_cache = SharedSnapAccessCache::new(); // Create a long-lived HTTP client for connection reuse let http_client = { @@ -257,6 +305,7 @@ impl CloudHelper { block_cache, snapshot_cache, snapshot_cache_capable_stores, + shared_snap_access_cache, meta_file_cache, schema_files: Arc::new(DashMap::new()), runtime, @@ -372,6 +421,7 @@ impl CloudHelper { let vector_index_cache = self.vector_index_cache.clone(); let snap_cache = self.snapshot_cache.clone(); let snap_cache_capable_stores = self.snapshot_cache_capable_stores.clone(); + let shared_snap_access_cache = self.shared_snap_access_cache.clone(); let meta_file_cache = self.meta_file_cache.clone(); let columnar_file_cache = self.columnar_file_cache.clone(); let fts_cache = self.fts_cache.clone(); @@ -380,7 +430,8 @@ impl CloudHelper { let tables_clone = tables.clone(); let fts_query_info_clone = fts_query_info.clone(); self.runtime.spawn(async move { - let snap = request_snapshot_from_leader( + let snap = get_or_request_shared_snapshot( + shared_snap_access_cache, pd_client, http_client, dfs, @@ -398,7 +449,7 @@ impl CloudHelper { shard_id, shard_ver, start_ts, - &tables_clone, + tables_clone, &master_key, fts_query_info_clone, ) @@ -444,6 +495,24 @@ impl CloudHelper { } } } + + pub fn get_region_bucket_keys(&self, region_id: u64, region_ver: u64) -> Vec> { + self.pd_client.get_region_bucket_keys(region_id, region_ver) + } + + pub fn clear_shared_snap_access_by_start_ts(&self, start_ts: u64) { + if start_ts == 0 { + return; + } + + let (cleared_cache_entries, in_flight_loader_entries) = + self.shared_snap_access_cache.remove_by_start_ts(start_ts); + + info!( + "clear shared snapaccess by start_ts, start_ts: {}, cleared_cache_entries: {}, in_flight_loader_entries: {}", + start_ts, cleared_cache_entries, in_flight_loader_entries + ); + } } fn collect_ia_meta_files(meta_paths: &[PathBuf]) -> std::io::Result> { @@ -646,7 +715,7 @@ async fn request_snapshot_from_leader( continue; } if delegate_resp.get_region_error().has_epoch_not_match() { - // Return epoch not match error to TiDB to retry. + // Return epoch not match error to caller to retry new plan. error!( "{} request_snapshot_from_leader failed, epoch not match, {:?}", tag, @@ -866,3 +935,323 @@ impl quick_cache::Weighter for SnapWeighter { val.snap.len() as u64 } } + +#[derive(Clone)] +pub struct SharedSnapAccessCache { + core: Arc, +} + +impl Deref for SharedSnapAccessCache { + type Target = SharedSnapAccessCacheCore; + fn deref(&self) -> &Self::Target { + &self.core + } +} + +impl SharedSnapAccessCache { + pub fn new() -> Self { + Self { + core: Arc::new(SharedSnapAccessCacheCore::new()), + } + } +} + +#[derive(Clone)] +pub struct SharedSnapAccessCacheCore { + groups: Arc>>, +} + +impl SharedSnapAccessCacheCore { + pub fn new() -> Self { + Self { + groups: Arc::new(DashMap::new()), + } + } + + pub fn get(&self, key: &SharedSnapAccessKey) -> Option> { + let group = self.groups.get(&key.start_ts).map(|entry| entry.clone())?; + let _state_guard = group.state_lock.lock().unwrap(); + if group.is_terminal() { + return None; + } + group.entries.get(key).map(|entry| entry.clone()) + } + + pub fn insert(&self, key: SharedSnapAccessKey, entry: Weak) { + let Some(group) = self.groups.get(&key.start_ts).map(|entry| entry.clone()) else { + return; + }; + let _state_guard = group.state_lock.lock().unwrap(); + if group.is_terminal() { + return; + } + group.entries.insert(key, entry); + } + + pub fn get_loader(&self, key: &SharedSnapAccessKey) -> Option>> { + let group = match self.groups.entry(key.start_ts) { + dashmap::mapref::entry::Entry::Occupied(entry) => entry.get().clone(), + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(Arc::new(SharedSnapAccessGroup::new())).clone() + } + }; + let _state_guard = group.state_lock.lock().unwrap(); + if group.is_terminal() { + return None; + } + let loader = group + .loaders + .entry(key.clone()) + .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) + .clone(); + Some(loader) + } + + pub fn remove_loader(&self, key: &SharedSnapAccessKey) -> bool { + let Some(group) = self.groups.get(&key.start_ts).map(|entry| entry.clone()) else { + return false; + }; + let _state_guard = group.state_lock.lock().unwrap(); + let removed = group.loaders.remove(key).is_some(); + drop(_state_guard); + self.try_remove_empty_group(key.start_ts, &group); + removed + } + + pub fn remove_entry(&self, key: &SharedSnapAccessKey) -> bool { + let Some(group) = self.groups.get(&key.start_ts).map(|entry| entry.clone()) else { + return false; + }; + let _state_guard = group.state_lock.lock().unwrap(); + let removed = group.entries.remove(key).is_some(); + drop(_state_guard); + self.try_remove_empty_group(key.start_ts, &group); + removed + } + + pub fn remove_by_start_ts(&self, start_ts: u64) -> (usize, usize) { + let Some(group) = self.groups.get(&start_ts).map(|entry| entry.clone()) else { + return (0, 0); + }; + let _state_guard = group.state_lock.lock().unwrap(); + group.mark_terminal(); + let removed_entries = group.entries.len(); + let in_flight_loaders = group.loaders.len(); + group.entries.clear(); + drop(_state_guard); + self.try_remove_empty_group(start_ts, &group); + (removed_entries, in_flight_loaders) + } + + fn try_remove_empty_group(&self, start_ts: u64, group: &Arc) { + let _state_guard = group.state_lock.lock().unwrap(); + if group.entries.is_empty() && group.loaders.is_empty() { + if let Some(entry) = self.groups.get(&start_ts) { + if Arc::ptr_eq(entry.value(), group) { + drop(entry); + let _ = self.groups.remove(&start_ts); + } + } + } + } +} + +pub struct SharedSnapAccessGroup { + entries: DashMap>, + loaders: DashMap>>, + terminal: AtomicBool, + state_lock: Mutex<()>, +} + +impl SharedSnapAccessGroup { + pub fn new() -> Self { + Self { + entries: DashMap::new(), + loaders: DashMap::new(), + terminal: AtomicBool::new(false), + state_lock: Mutex::new(()), + } + } + + fn is_terminal(&self) -> bool { + self.terminal.load(Ordering::Acquire) + } + + fn mark_terminal(&self) { + self.terminal.store(true, Ordering::Release); + } +} + +#[derive(Clone, Eq, PartialEq, Hash)] +pub struct SharedSnapAccessKey { + pub shard_id: u64, + pub shard_ver: u64, + pub start_ts: u64, + pub start_table_id: i64, + pub end_table_id: i64, + pub prepare_all: bool, +} + +impl SharedSnapAccessKey { + pub fn new( + shard_id: u64, + shard_ver: u64, + start_ts: u64, + start_table_id: i64, + end_table_id: i64, + prepare_all: bool, + ) -> Self { + Self { + shard_id, + shard_ver, + start_ts, + start_table_id, + end_table_id, + prepare_all, + } + } +} + +fn upgrade_shared_snap_access( + cache: &SharedSnapAccessCache, + key: &SharedSnapAccessKey, +) -> Option { + let core = match cache.get(key) { + Some(core) => match core.upgrade() { + Some(core) => core, + None => { + cache.remove_entry(key); + return None; + } + }, + None => return None, + }; + Some(SnapAccess { core }) +} + +async fn get_or_request_shared_snapshot( + shared_snap_access_cache: SharedSnapAccessCache, + pd_client: Arc, + http_client: security::HttpClient, + dfs: Arc, + ia_ctx: IaCtx, + vector_index_cache: VectorIndexCache, + columnar_file_cache: ColumnarFileCache, + snap_cache: SnapCache, + snap_cache_capable_stores: Arc>, + meta_file_cache: Arc, MetaFileCacheWeighter>>, + schema_files: Arc>, + txn_chunk_manager: TxnChunkManager, + block_cache: BlockCache, + fts_cache: FtsCache, + fts_delta_cache: FtsDeltaCache, + shard_id: u64, + shard_ver: u64, + start_ts: u64, + tables: Vec, + master_key: &MasterKey, + fts_query_info: tipb::FtsQueryInfo, +) -> Result { + let start_table_id = tables[0].table_id; + let end_table_id = tables[tables.len() - 1].table_id; + let prepare_all = fts_query_info.get_query_type() != tipb::FtsQueryType::FtsQueryTypeInvalid; + let key = SharedSnapAccessKey::new( + shard_id, + shard_ver, + start_ts, + start_table_id, + end_table_id, + prepare_all, + ); + + if let Some(snap) = upgrade_shared_snap_access(&shared_snap_access_cache, &key) { + info!( + "reuse shared snapaccess directly, shard_id: {}, shard_ver: {}, start_ts: {}, start_table_id: {}, end_table_id: {}", + shard_id, shard_ver, start_ts, start_table_id, end_table_id + ); + return Ok(snap); + } + + let Some(loader) = shared_snap_access_cache.get_loader(&key) else { + return Err(format!("shared snapaccess evicted, start_ts: {}", start_ts).into()); + }; + let _guard = loader.lock().await; + + if let Some(snap) = upgrade_shared_snap_access(&shared_snap_access_cache, &key) { + info!( + "reuse shared snapaccess after wait, shard_id: {}, shard_ver: {}, start_ts: {}, start_table_id: {}, end_table_id: {}", + shard_id, shard_ver, start_ts, start_table_id, end_table_id + ); + return Ok(snap); + } + + info!( + "load shared snapaccess, shard_id: {}, shard_ver: {}, start_ts: {}, start_table_id: {}, end_table_id: {}", + shard_id, shard_ver, start_ts, start_table_id, end_table_id + ); + let snap = request_snapshot_from_leader( + pd_client, + http_client, + dfs, + ia_ctx, + vector_index_cache, + columnar_file_cache, + snap_cache, + snap_cache_capable_stores, + meta_file_cache, + schema_files, + txn_chunk_manager, + block_cache, + fts_cache, + fts_delta_cache, + shard_id, + shard_ver, + start_ts, + &tables, + master_key, + fts_query_info, + ) + .await; + + if let Ok(ref snap_access) = snap { + shared_snap_access_cache.insert(key.clone(), Arc::downgrade(&snap_access.core)); + } + shared_snap_access_cache.remove_loader(&key); + snap +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn shared_snap_access_eviction_is_sticky_for_in_flight_loader() { + let cache = SharedSnapAccessCache::new(); + let key = SharedSnapAccessKey::new(1, 2, 3, 4, 5, false); + + let loader = cache + .get_loader(&key) + .expect("active group should create loader"); + cache.insert(key.clone(), Weak::new()); + assert!(cache + .groups + .get(&key.start_ts) + .is_some_and(|group| group.entries.contains_key(&key))); + + let (removed_entries, in_flight_loaders) = cache.remove_by_start_ts(key.start_ts); + assert_eq!(removed_entries, 1); + assert_eq!(in_flight_loaders, 1); + assert!(cache.get(&key).is_none()); + assert!(cache.get_loader(&key).is_none()); + + cache.insert(key.clone(), Weak::new()); + assert!(cache + .groups + .get(&key.start_ts) + .is_some_and(|group| group.entries.is_empty())); + + drop(loader); + assert!(cache.remove_loader(&key)); + assert!(cache.groups.get(&key.start_ts).is_none()); + } +} diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs index 6316c2107a6..6ba02daf5ad 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs @@ -18,10 +18,10 @@ use kvengine::{CloudColumnarReaders, TableCtx}; use protobuf::{parse_from_bytes, Message}; use crate::{ - build_from_string, + build_from_string, build_from_vec_string, interfaces_ffi::{ BaseBuffView, ColumnarReaderErrorType, ColumnarReaderPtr, RaftStoreProxyPtr, RawRustPtr, - RawVoidPtr, RustStrWithView, + RawVoidPtr, RustStrWithView, RustStrWithViewVec, }, RawRustPtrType, }; @@ -73,6 +73,31 @@ impl From for ColumnarReaderPtr { } } +pub unsafe extern "C" fn ffi_get_region_bucket_keys( + region_id: u64, + region_ver: u64, + hub_ptr: RaftStoreProxyPtr, +) -> RustStrWithViewVec { + let hub = hub_ptr.as_ref(); + let bucket_keys = hub + .cloud_helper + .get_region_bucket_keys(region_id, region_ver); + if bucket_keys.is_empty() { + RustStrWithViewVec::default() + } else { + build_from_vec_string(bucket_keys) + } +} + +pub unsafe extern "C" fn ffi_clear_shared_snap_access_by_start_ts( + start_ts: u64, + hub_ptr: RaftStoreProxyPtr, +) { + let hub = hub_ptr.as_ref(); + hub.cloud_helper + .clear_shared_snap_access_by_start_ts(start_ts); +} + pub unsafe extern "C" fn ffi_make_columnar_reader( shard_id: u64, shard_ver: u64, diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs index a52c1b0233e..8c76a251986 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs @@ -355,6 +355,16 @@ pub mod root { arg1: root::DB::RaftStoreProxyPtr, ) -> root::DB::RawCppStringPtr, >, + pub fn_get_region_bucket_keys: ::std::option::Option< + unsafe extern "C" fn( + arg1: u64, + arg2: u64, + arg3: root::DB::RaftStoreProxyPtr, + ) -> root::DB::RustStrWithViewVec, + >, + pub fn_clear_shared_snap_access_by_start_ts: ::std::option::Option< + unsafe extern "C" fn(arg1: u64, arg2: root::DB::RaftStoreProxyPtr), + >, pub fn_get_columnar_reader: ::std::option::Option< unsafe extern "C" fn( arg1: u64, diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs index bbafede8796..95b9777ae21 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs @@ -55,6 +55,7 @@ use tikv_util::{ use crate::{ cloud_helper::{CloudEngineBackends, CloudHelper}, columnar_impls::{ + ffi_clear_shared_snap_access_by_start_ts, ffi_get_region_bucket_keys, ffi_make_columnar_reader, ffi_physical_table_id, ffi_read_block, ffi_read_column, ffi_read_handle, ffi_read_version, }, @@ -1147,6 +1148,8 @@ fn build_hub_ffi_helper(hub: &ColumnarHub) -> RaftStoreProxyFFIHelper { cloud_storage_engine_interfaces: CloudStorageEngineInterfaces { fn_get_keyspace_encryption: Some(ffi_get_keyspace_encryption), fn_get_master_key: Some(ffi_get_master_key), + fn_get_region_bucket_keys: Some(ffi_get_region_bucket_keys), + fn_clear_shared_snap_access_by_start_ts: Some(ffi_clear_shared_snap_access_by_start_ts), fn_get_columnar_reader: Some(ffi_make_columnar_reader), fn_read_block: Some(ffi_read_block), fn_read_handle: Some(ffi_read_handle), diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 8d02a30d9ae..fa68182c0c9 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1069,6 +1069,7 @@ try LOG_INFO(log, "Init S3 GC Manager"); global_context->getTMTContext().initS3GCManager(proxy_machine.getProxyHelper()); // Initialize the thread pool of storage before the storage engine is initialized. + if (!disagg_opt.use_columnar) { LOG_INFO(log, "dt_enable_read_thread {}", global_context->getSettingsRef().dt_enable_read_thread); // `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`. diff --git a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp index c83f976ddb3..fbc93f9d195 100644 --- a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp +++ b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp @@ -16,8 +16,8 @@ #if ENABLE_NEXT_GEN_COLUMNAR #include #include +#include #include -#include #include #include #include @@ -40,6 +40,7 @@ #include #include #include +#include #include #include #include @@ -57,6 +58,7 @@ #include #include +#include namespace DB { @@ -65,8 +67,277 @@ namespace ErrorCodes extern const int COLUMNAR_SNAPSHOT_ERROR; } // namespace ErrorCodes +struct RNColumnarReaderSharedContext +{ + using ClearSharedSnapAccessByStartTsFn = void (*)(uint64_t, RaftStoreProxyPtr); + + struct StartTsClearRegistry + { + enum class UnregisterResult + { + NotRegistered, + NotLastOwner, + LastOwner, + }; + + std::mutex mutex; + std::unordered_map ref_counts; + + void registerStartTs(UInt64 start_ts) + { + if (start_ts == 0) + return; + auto guard = std::lock_guard(mutex); + ++ref_counts[start_ts]; + } + + UnregisterResult unregisterStartTs(UInt64 start_ts) + { + if (start_ts == 0) + return UnregisterResult::NotRegistered; + + auto guard = std::lock_guard(mutex); + auto it = ref_counts.find(start_ts); + if (it == ref_counts.end() || it->second == 0) + return UnregisterResult::NotRegistered; + --it->second; + if (it->second != 0) + return UnregisterResult::NotLastOwner; + + ref_counts.erase(it); + return UnregisterResult::LastOwner; + } + }; + + static StartTsClearRegistry & getStartTsClearRegistry() + { + static StartTsClearRegistry registry; + return registry; + } + + LoggerPtr log; + const Context * context = nullptr; + UInt64 start_ts = 0; + DM::ColumnDefinesPtr column_defines; + int extra_table_id_index = -1; + TableID logical_table_id = 0; + String executor_id; + String table_scan_data; + String filter_conditions_data; + String table_info_data; + String ann_query_info_data; + String fts_query_info_data; + RaftStoreProxyPtr proxy_ptr{}; + ClearSharedSnapAccessByStartTsFn clear_shared_snap_access_by_start_ts = nullptr; + std::shared_ptr output_lock = std::make_shared(); + bool registered_for_start_ts = false; + + ~RNColumnarReaderSharedContext() noexcept + { + if (!registered_for_start_ts) + return; + + auto unregister_result = getStartTsClearRegistry().unregisterStartTs(start_ts); + if (unregister_result != StartTsClearRegistry::UnregisterResult::LastOwner) + return; + + if (proxy_ptr.inner == nullptr || clear_shared_snap_access_by_start_ts == nullptr) + return; + + try + { + clear_shared_snap_access_by_start_ts(start_ts, proxy_ptr); + } + catch (...) + { + LOG_WARNING(log, "clear shared snapaccess cache failed, start_ts={}", start_ts); + } + } +}; + +size_t getRNColumnarSourceNum(size_t num_streams, size_t reader_count) +{ + return std::min(std::max(1, num_streams), reader_count); +} + namespace { +using ColumnarPhysicalTableRanges = std::vector>; +using BucketSplitUnit = std::pair; + +void normalizeTimestampCompareDateTimeLiteralToUTC(tipb::Expr & expr, const TimezoneInfo & timezone_info); + +struct BucketSplitResult +{ + bool has_bucket_split = false; + std::vector units; +}; + +struct RegionReaderPlan +{ + RegionID region_id; + pingcap::kv::RegionVerID region_ver_id; + ColumnarPhysicalTableRanges physical_table_ranges; + std::vector bucket_units; +}; + +bool isBucketBoundaryInsideRange(const String & bucket_key, const pingcap::coprocessor::KeyRange & range) +{ + if (bucket_key.empty()) + return false; + if (!range.start_key.empty() && bucket_key <= range.start_key) + return false; + if (!range.end_key.empty() && bucket_key >= range.end_key) + return false; + return true; +} + +BucketSplitResult splitRangesByBucketKeys( + const ColumnarPhysicalTableRanges & physical_table_ranges, + const std::vector & bucket_keys) +{ + BucketSplitResult result; + if (bucket_keys.size() <= 2) + return result; + + for (const auto & [table_id, ranges] : physical_table_ranges) + { + for (const auto & range : ranges) + { + String current_start = range.start_key; + bool current_range_split = false; + for (const auto & bucket_key : bucket_keys) + { + String normalized_bucket_key; + try + { + // Bucket boundaries from PD are TiKV encoded keys. Empty region boundaries and + // malformed non-empty keys are both possible invalid split points, and length + // checks alone cannot validate TiKV memcomparable encoding markers/padding. + // Skip only the bad boundary so the original range is still covered by a + // coarser reader plan. + const auto decoded_bucket_key + = RecordKVFormat::decodeTiKVKey(TiKVKey(bucket_key.data(), bucket_key.size())); + normalized_bucket_key.assign(decoded_bucket_key.data(), decoded_bucket_key.size()); + } + catch (...) + { + continue; + } + if (!isBucketBoundaryInsideRange(normalized_bucket_key, range)) + continue; + result.units.emplace_back( + table_id, + pingcap::coprocessor::KeyRange{current_start, normalized_bucket_key}); + current_start = std::move(normalized_bucket_key); + current_range_split = true; + } + if (!range.end_key.empty() && current_start >= range.end_key) + continue; + result.units.emplace_back(table_id, pingcap::coprocessor::KeyRange{current_start, range.end_key}); + result.has_bucket_split = result.has_bucket_split || current_range_split; + } + } + return result; +} + +std::vector getRegionBucketKeysFromColumnar(const Context & context, RegionID region_id, UInt64 region_ver) +{ + const Context & global_ctx = context.getGlobalContext(); + const TiFlashRaftProxyHelper * proxy_helper = global_ctx.getSharedContextDisagg()->getColumnarProxyHelper(); + if (proxy_helper == nullptr || proxy_helper->cloud_storage_engine_interfaces.fn_get_region_bucket_keys == nullptr) + return {}; + + RustStrWithViewVec bucket_keys = proxy_helper->cloud_storage_engine_interfaces.fn_get_region_bucket_keys( + region_id, + region_ver, + proxy_helper->proxy_ptr); + SCOPE_EXIT({ + if (bucket_keys.inner.ptr != nullptr) + RustGcHelper::instance().gcRustPtr(bucket_keys.inner.ptr, bucket_keys.inner.type); + }); + + std::vector res; + res.reserve(static_cast(bucket_keys.len)); + for (size_t i = 0; i < bucket_keys.len; ++i) + res.emplace_back(bucket_keys.buffs[i].data, bucket_keys.buffs[i].len); + return res; +} + +std::vector buildRegionReaderPlansFromPhysicalTableRanges( + const LoggerPtr & log, + const Context & context, + const ColumnarPhysicalTableRanges & physical_table_ranges) +{ + std::vector region_reader_plans; + if (physical_table_ranges.empty()) + return region_reader_plans; + + pingcap::kv::Cluster * cluster = context.getTMTContext().getKVCluster(); + pingcap::kv::Backoffer bo(pingcap::kv::copBuildTaskMaxBackoff); + auto & region_cache = cluster->region_cache; + + std::unordered_map plan_index_by_region_id; + region_reader_plans.reserve(physical_table_ranges.size()); + + for (const auto & [physical_table_id, ranges] : physical_table_ranges) + { + const auto locations = pingcap::coprocessor::details::splitKeyRangesByLocations(region_cache, bo, ranges); + for (const auto & location : locations) + { + const auto & region = location.location.region; + auto it = plan_index_by_region_id.find(region.id); + if (it == plan_index_by_region_id.end()) + { + plan_index_by_region_id.emplace(region.id, region_reader_plans.size()); + region_reader_plans.push_back(RegionReaderPlan{ + .region_id = region.id, + .region_ver_id = region, + .physical_table_ranges + = ColumnarPhysicalTableRanges{std::make_tuple(physical_table_id, location.ranges)}, + }); + continue; + } + + auto & plan = region_reader_plans[it->second]; + if (plan.region_ver_id != region) + { + region_cache->dropRegion(plan.region_ver_id); + region_cache->dropRegion(region); + LOG_WARNING( + log, + "build RegionReaderPlan failed region_id={}, epoch not match {}", + region.id, + region.toString()); + throw RegionException( + RegionException::UnavailableRegions{region.id}, + RegionException::RegionReadStatus::EPOCH_NOT_MATCH, + region.toString().c_str()); + } + plan.physical_table_ranges.push_back(std::make_tuple(physical_table_id, location.ranges)); + } + } + + return region_reader_plans; +} + +std::vector buildReaderPlansFromRegionReaderPlans( + const std::vector & region_reader_plans) +{ + std::vector reader_plans; + reader_plans.reserve(region_reader_plans.size()); + for (const auto & plan : region_reader_plans) + { + reader_plans.push_back(RNColumnarReaderPlan{ + .region_id = plan.region_id, + .region_ver = plan.region_ver_id.ver, + .region_conf_ver = plan.region_ver_id.conf_ver, + .physical_table_ranges = plan.physical_table_ranges, + }); + } + return reader_plans; +} + std::vector> genGeneratedColumnInfosForDisaggregatedRead( const TiDBTableScan & table_scan) { @@ -109,10 +380,101 @@ std::tuple genColumnDefinesForDisaggregatedReadThroug return {std::move(column_defines), extra_table_id_index}; } -bool isProxyFilterComparableExpr(tipb::ScalarFuncSig sig) +std::shared_ptr buildColumnarReaderSharedContext( + const LoggerPtr & log, + const Context & context, + UInt64 start_ts, + const TiDBTableScan & table_scan, + const FilterConditions & filter_conditions) +{ + auto shared_context = std::make_shared(); + shared_context->log = log; + shared_context->context = &context; + shared_context->start_ts = start_ts; + RNColumnarReaderSharedContext::getStartTsClearRegistry().registerStartTs(start_ts); + shared_context->registered_for_start_ts = true; + shared_context->logical_table_id = table_scan.getLogicalTableID(); + shared_context->executor_id = table_scan.getTableScanExecutorID(); + const TiFlashRaftProxyHelper * proxy_helper + = context.getGlobalContext().getSharedContextDisagg()->getColumnarProxyHelper(); + if (proxy_helper != nullptr) + { + shared_context->proxy_ptr = proxy_helper->proxy_ptr; + shared_context->clear_shared_snap_access_by_start_ts + = proxy_helper->cloud_storage_engine_interfaces.fn_clear_shared_snap_access_by_start_ts; + } + std::tie(shared_context->column_defines, shared_context->extra_table_id_index) + = genColumnDefinesForDisaggregatedReadThroughColumnar(table_scan); + + auto table_scan_pb = *table_scan.getTableScanPB(); + const auto & timezone_info = context.getTimezoneInfo(); + if (table_scan_pb.tp() == tipb::TypePartitionTableScan) + { + auto * pushed_down_filters + = table_scan_pb.mutable_partition_table_scan()->mutable_pushed_down_filter_conditions(); + for (int i = 0; i < pushed_down_filters->size(); ++i) + normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); + } + else + { + auto * pushed_down_filters = table_scan_pb.mutable_tbl_scan()->mutable_pushed_down_filter_conditions(); + for (int i = 0; i < pushed_down_filters->size(); ++i) + normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); + } + shared_context->table_scan_data = table_scan_pb.SerializeAsString(); + + auto conditions = filter_conditions.conditions; + for (int i = 0; i < conditions.size(); ++i) + normalizeTimestampCompareDateTimeLiteralToUTC(*conditions.Mutable(i), timezone_info); + for (const auto & condition : conditions) + { + auto data = condition.SerializeAsString(); + uint32_t len = data.size(); + shared_context->filter_conditions_data.append(reinterpret_cast(&len), sizeof(len)); + shared_context->filter_conditions_data.append(data.data(), data.size()); + } + + tipb::TableInfo table_info; + bool is_partition_scan = table_scan.isPartitionTableScan(); + const auto & tidb_columns = table_scan.getColumns(); + const auto should_skip_column_for_columnar_table_info = [&](ColumnID column_id) { + if (column_id == MutSup::extra_table_id_col_id) + return true; + for (const auto & ci : tidb_columns) + { + if (ci.id == column_id && ci.hasGeneratedColumnFlag()) + return true; + } + return false; + }; + if (is_partition_scan) + { + for (const auto & column : table_scan_pb.partition_table_scan().columns()) + { + if (should_skip_column_for_columnar_table_info(column.column_id())) + continue; + *table_info.add_columns() = column; + } + } + else + { + for (const auto & column : table_scan_pb.tbl_scan().columns()) + { + if (should_skip_column_for_columnar_table_info(column.column_id())) + continue; + *table_info.add_columns() = column; + } + } + shared_context->table_info_data = table_info.SerializeAsString(); + shared_context->ann_query_info_data = table_scan.getANNQueryInfo().SerializeAsString(); + shared_context->fts_query_info_data = table_scan.getFTSQueryInfo().SerializeAsString(); + return shared_context; +} + +bool isColumnarFilterComparableExpr(tipb::ScalarFuncSig sig) { - // Keep this aligned with proxy columnar filter supported signatures: - // `contrib/tiflash-proxy/components/kvengine/src/table/columnar/filter.rs`. + // Keep this aligned with kvengine columnar filter supported signatures: + // `components/kvengine/src/table/columnar/filter.rs`. switch (sig) { case tipb::ScalarFuncSig::LTInt: @@ -176,9 +538,9 @@ void normalizeTimestampCompareDateTimeLiteralToUTC(tipb::Expr & expr, const Time if (!isFunctionExpr(expr)) return; - // Only normalize for comparison expressions that proxy filter supports. + // Only normalize for comparison expressions that columnar filter supports. // Keep recursion so nested comparisons under AND/OR/NOT still work. - if (isScalarFunctionExpr(expr) && isProxyFilterComparableExpr(expr.sig())) + if (isScalarFunctionExpr(expr) && isColumnarFilterComparableExpr(expr.sig())) { bool has_timestamp_column = false; bool only_column_or_literal = true; @@ -197,9 +559,9 @@ void normalizeTimestampCompareDateTimeLiteralToUTC(tipb::Expr & expr, const Time } } - // Proxy filter parser only supports simple column-literal expressions. + // Columnar filter parser only supports simple column-literal expressions. // If a timestamp column is compared with a datetime literal, normalize the - // datetime literal from session timezone to UTC before passing to proxy. + // datetime literal from session timezone to UTC before passing to columnar. if (has_timestamp_column && only_column_or_literal && column_ref_count == 1) { static const auto & time_zone_utc = DateLUT::instance("UTC"); @@ -231,7 +593,7 @@ void StorageDisaggregated::filterConditionsWithPushedDownFilters( DAGExpressionAnalyzer & analyzer, DAGPipeline & pipeline) { - // Proxy columnar reader uses late-materialization filters only to reduce packs loaded from disk. + // Columnar reader uses late-materialization filters only to reduce packs loaded from disk. // It does not guarantee that all rows failing those filters are removed, so merge them into // FilterConditions and re-apply them in the TiFlash pipeline for correctness. FilterConditions conditions(filter_conditions.executor_id, filter_conditions.conditions); @@ -249,7 +611,7 @@ void StorageDisaggregated::filterConditionsWithPushedDownFilters( PipelineExecGroupBuilder & group_builder, DAGExpressionAnalyzer & analyzer) { - // Proxy columnar reader uses late-materialization filters only to reduce packs loaded from disk. + // Columnar reader uses late-materialization filters only to reduce packs loaded from disk. // It does not guarantee that all rows failing those filters are removed, so merge them into // FilterConditions and re-apply them in the TiFlash pipeline for correctness. FilterConditions conditions(filter_conditions.executor_id, filter_conditions.conditions); @@ -267,7 +629,7 @@ BlockInputStreams StorageDisaggregated::readThroughColumnar(const Context & cont const UInt64 start_ts = sender_target_mpp_task_id.gather_id.query_id.start_ts; auto [remote_table_ranges, region_num] = buildRemoteTableRanges(); const auto generated_column_infos = genGeneratedColumnInfosForDisaggregatedRead(table_scan); - auto read_proxy_tasks = RNProxyReadTask::buildProxyReadTaskWithBackoff( + auto read_columnar_tasks = RNColumnarReadTask::buildColumnarReadTaskWithBackoff( log, context, start_ts, @@ -275,12 +637,12 @@ BlockInputStreams StorageDisaggregated::readThroughColumnar(const Context & cont filter_conditions, remote_table_ranges, num_streams); - for (auto & task : read_proxy_tasks) + for (auto & task : read_columnar_tasks) { auto streams = task->getInputStreams(); pipeline.streams.insert(pipeline.streams.end(), streams.begin(), streams.end()); } - // Avoid reading generated columns from proxy, generate placeholders locally. + // Avoid reading generated columns from columnar, generate placeholders locally. executeGeneratedColumnPlaceholder(generated_column_infos, log, pipeline); NamesAndTypes source_columns; source_columns.reserve(table_scan.getColumnSize()); @@ -291,7 +653,7 @@ BlockInputStreams StorageDisaggregated::readThroughColumnar(const Context & cont } analyzer = std::make_unique(std::move(source_columns), context); - // Handle duration/timestamp cast for proxy path. + // Handle duration/timestamp cast for columnar path. // We still execute pushed-down filters on RN side, so timestamp columns in those filters // must also be converted from UTC to session timezone. extraCast(*analyzer, pipeline, /*include_pushed_down_filter_columns=*/true); @@ -309,7 +671,7 @@ void StorageDisaggregated::readThroughColumnar( { const UInt64 start_ts = sender_target_mpp_task_id.gather_id.query_id.start_ts; auto [remote_table_ranges, region_num] = buildRemoteTableRanges(); - auto read_proxy_tasks = RNProxyReadTask::buildProxyReadTaskWithBackoff( + auto read_columnar_tasks = RNColumnarReadTask::buildColumnarReadTaskWithBackoff( log, context, start_ts, @@ -317,20 +679,25 @@ void StorageDisaggregated::readThroughColumnar( filter_conditions, remote_table_ranges, num_streams); - auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedReadThroughColumnar(table_scan); - for (auto & task : read_proxy_tasks) + const auto generated_column_infos = genGeneratedColumnInfosForDisaggregatedRead(table_scan); + if (!read_columnar_tasks.empty()) { - group_builder.addConcurrency(RNProxySourceOp::create({ - .context = context, - .debug_tag = log->identifier(), - .exec_context = exec_context, - .columns_to_read = *column_defines, - .task = task, - .extra_table_id_index = extra_table_id_index, - })); + auto & task_pool = read_columnar_tasks.front(); + const size_t source_num = task_pool->getSourceNum(); + LOG_INFO( + log, + "use shared columnar reader task pool, reader_num={}, source_num={}", + task_pool->getReaderCount(), + source_num); + for (size_t i = 0; i < source_num; ++i) + { + group_builder.addConcurrency(RNColumnarSourceOp::create({ + .exec_context = exec_context, + .task = task_pool, + })); + } } - const auto generated_column_infos = genGeneratedColumnInfosForDisaggregatedRead(table_scan); executeGeneratedColumnPlaceholder(exec_context, group_builder, generated_column_infos, log); NamesAndTypes source_columns; @@ -340,50 +707,20 @@ void StorageDisaggregated::readThroughColumnar( source_columns.emplace_back(col.name, col.type); analyzer = std::make_unique(std::move(source_columns), context); - // Handle duration/timestamp cast for proxy path. + // Handle duration/timestamp cast for columnar path. extraCast(exec_context, group_builder, *analyzer, /*include_pushed_down_filter_columns=*/true); // Handle filter filterConditionsWithPushedDownFilters(exec_context, group_builder, *analyzer); } -// RNProxyReaderPtr -RNProxyReaderPtr RNProxyReader::createProxyReader( - const LoggerPtr & log, - const Context & context, - RegionID region_id, - RegionVersion region_ver, - UInt64 region_conf_ver, - const std::vector> & physical_table_ranges, - UInt64 start_ts, - const TiDBTableScan & table_scan, - const FilterConditions & filter_conditions, - std::mutex & output_lock) +ColumnarReaderPtr createColumnarReader( + const RNColumnarReaderSharedContext & shared_context, + const RNColumnarReaderPlan & reader_plan) { - auto table_scan_pb = *table_scan.getTableScanPB(); - const auto & timezone_info = context.getTimezoneInfo(); - if (table_scan_pb.tp() == tipb::TypePartitionTableScan) - { - auto * pushed_down_filters - = table_scan_pb.mutable_partition_table_scan()->mutable_pushed_down_filter_conditions(); - for (int i = 0; i < pushed_down_filters->size(); ++i) - normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); - } - else - { - auto * pushed_down_filters = table_scan_pb.mutable_tbl_scan()->mutable_pushed_down_filter_conditions(); - for (int i = 0; i < pushed_down_filters->size(); ++i) - normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); - } - auto table_scan_data = table_scan_pb.SerializeAsString(); - auto table_scan_view = BaseBuffView{table_scan_data.data(), table_scan_data.size()}; - auto conditions = filter_conditions.conditions; - for (int i = 0; i < conditions.size(); ++i) - normalizeTimestampCompareDateTimeLiteralToUTC(*conditions.Mutable(i), timezone_info); - // Copy pushed down filters to filter_conditions to make filterConditions works properly. - // Proxy columnar reader use pushed down filters to reduce packs load from disk and has no - // guarantee to filter all useless data, so we rely on the filterConditions to filter data. + const auto & log = shared_context.log; + const auto & context = *shared_context.context; String tables_range_data; - for (const auto & [physical_table_id, ranges] : physical_table_ranges) + for (const auto & [physical_table_id, ranges] : reader_plan.physical_table_ranges) { tables_range_data.append(reinterpret_cast(&physical_table_id), sizeof(physical_table_id)); @@ -403,64 +740,22 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( tables_range_data.append(ranges_data.data(), ranges_data.size()); } auto tables_range_view = BaseBuffView{tables_range_data.data(), tables_range_data.size()}; - String filter_conditions_data; - for (const auto & condition : conditions) - { - auto data = condition.SerializeAsString(); - uint32_t len = data.size(); - filter_conditions_data.append(reinterpret_cast(&len), sizeof(len)); - filter_conditions_data.append(data.data(), data.size()); - } - tipb::TableInfo table_info; - bool is_partition_scan = table_scan.isPartitionTableScan(); - const auto & tidb_columns = table_scan.getColumns(); - const auto should_skip_column_for_columnar_table_info = [&](ColumnID column_id) { - // _tidb_tid is filled locally by TiFlash, consistent with genColumnDefinesForDisaggregatedRead(). - if (column_id == MutSup::extra_table_id_col_id) - return true; - // Generated columns are not stored in kvengine; executeGeneratedColumnPlaceholder fills them later. - for (const auto & ci : tidb_columns) - { - if (ci.id == column_id && ci.hasGeneratedColumnFlag()) - return true; - } - return false; - }; - if (is_partition_scan) - { - for (const auto & column : table_scan_pb.partition_table_scan().columns()) - { - if (should_skip_column_for_columnar_table_info(column.column_id())) - continue; - *table_info.add_columns() = column; - } - } - else - { - for (const auto & column : table_scan_pb.tbl_scan().columns()) - { - if (should_skip_column_for_columnar_table_info(column.column_id())) - continue; - *table_info.add_columns() = column; - } - } - auto table_info_data = table_info.SerializeAsString(); - auto columns = BaseBuffView{table_info_data.data(), table_info_data.size()}; - auto filter_conditions_view = BaseBuffView{filter_conditions_data.data(), filter_conditions_data.size()}; - const auto & ann_query_info_pb = table_scan.getANNQueryInfo(); - const auto & fts_query_info_pb = table_scan.getFTSQueryInfo(); - auto ann_query_info_data = ann_query_info_pb.SerializeAsString(); - auto fts_query_info_data = fts_query_info_pb.SerializeAsString(); - auto ann_query_info_view = BaseBuffView{ann_query_info_data.data(), ann_query_info_data.size()}; - auto fts_query_info_view = BaseBuffView{fts_query_info_data.data(), fts_query_info_data.size()}; + auto columns = BaseBuffView{shared_context.table_info_data.data(), shared_context.table_info_data.size()}; + auto filter_conditions_view + = BaseBuffView{shared_context.filter_conditions_data.data(), shared_context.filter_conditions_data.size()}; + auto table_scan_view = BaseBuffView{shared_context.table_scan_data.data(), shared_context.table_scan_data.size()}; + auto ann_query_info_view + = BaseBuffView{shared_context.ann_query_info_data.data(), shared_context.ann_query_info_data.size()}; + auto fts_query_info_view + = BaseBuffView{shared_context.fts_query_info_data.data(), shared_context.fts_query_info_data.size()}; const Context & global_ctx = context.getGlobalContext(); auto * cluster = global_ctx.getTMTContext().getKVCluster(); const TiFlashRaftProxyHelper * proxy_helper = global_ctx.getSharedContextDisagg()->getColumnarProxyHelper(); RUNTIME_CHECK_MSG(proxy_helper != nullptr, "columnar proxy helper is not initialized"); ColumnarReaderPtr columnar_reader = proxy_helper->cloud_storage_engine_interfaces.fn_get_columnar_reader( - region_id, - region_ver, - start_ts, + reader_plan.region_id, + reader_plan.region_ver, + shared_context.start_ts, std::move(tables_range_view), std::move(columns), std::move(table_scan_view), @@ -468,13 +763,14 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( std::move(ann_query_info_view), std::move(fts_query_info_view), proxy_helper->proxy_ptr); - bool reader_transferred = false; + bool reader_returned = false; SCOPE_EXIT({ - if (!reader_transferred) + if (!reader_returned && columnar_reader.inner.ptr != nullptr) RustGcHelper::instance().gcRustPtr(columnar_reader.inner.ptr, columnar_reader.inner.type); }); SCOPE_EXIT({ - if (!reader_transferred && columnar_reader.error_type != ColumnarReaderErrorType::OK) + if (!reader_returned && columnar_reader.error_type != ColumnarReaderErrorType::OK + && columnar_reader.error.inner.ptr != nullptr) RustGcHelper::instance().gcRustPtr(columnar_reader.error.inner.ptr, columnar_reader.error.inner.type); }); if (columnar_reader.error_type == ColumnarReaderErrorType::RegionError) @@ -482,26 +778,25 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( auto error_msg = String(columnar_reader.error.buff.data, columnar_reader.error.buff.len); errorpb::Error region_error; region_error.ParseFromString(error_msg); - auto region_ver_id = pingcap::kv::RegionVerID(region_id, region_conf_ver, region_ver); + auto region_ver_id + = pingcap::kv::RegionVerID(reader_plan.region_id, reader_plan.region_conf_ver, reader_plan.region_ver); // Refresh region cache and throw an exception for retrying. if (region_error.has_epoch_not_match()) { RegionException::UnavailableRegions unavailable_regions; String region_id_ver; // region_id:region_ver:conf_ver - std::unordered_set retry_regions; for (const auto & region : region_error.epoch_not_match().current_regions()) { unavailable_regions.insert(region.id()); - retry_regions.insert(region.id()); - region_id_ver = std::to_string(region.id()) + ":" + std::to_string(region_ver) + ":" + region_id_ver = std::to_string(region.id()) + ":" + std::to_string(reader_plan.region_ver) + ":" + std::to_string(region.region_epoch().conf_ver()); } - auto guard = std::lock_guard(output_lock); + auto guard = std::lock_guard(*shared_context.output_lock); cluster->region_cache->dropRegion(region_ver_id); LOG_WARNING( log, "create columnar reader failed region_id={}, epoch not match {}", - std::to_string(region_id), + std::to_string(reader_plan.region_id), region_ver_id.toString()); throw RegionException( std::move(unavailable_regions), @@ -511,17 +806,15 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( else { RegionException::UnavailableRegions unavailable_regions; - std::unordered_set retry_regions; auto err_region_id = 0; if (region_error.has_region_not_found()) { err_region_id = region_error.region_not_found().region_id(); unavailable_regions.insert(err_region_id); - retry_regions.insert(err_region_id); LOG_WARNING( log, "create columnar reader failed region_id={}, region not found {}", - std::to_string(region_id), + std::to_string(reader_plan.region_id), std::to_string(err_region_id)); } else @@ -529,15 +822,15 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( LOG_WARNING( log, "create columnar reader failed region_id={}, {}", - std::to_string(region_id), + std::to_string(reader_plan.region_id), region_error.ShortDebugString()); } - auto guard = std::lock_guard(output_lock); + auto guard = std::lock_guard(*shared_context.output_lock); cluster->region_cache->dropRegion(region_ver_id); throw RegionException( std::move(unavailable_regions), RegionException::RegionReadStatus::NOT_FOUND, - std::to_string(region_id).c_str()); + std::to_string(reader_plan.region_id).c_str()); } } else if (columnar_reader.error_type == ColumnarReaderErrorType::LockedError) @@ -549,8 +842,8 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( pingcap::kv::Backoffer bo(pingcap::kv::copNextMaxBackoff); std::vector pushed; std::vector locks{std::make_shared(lock_info)}; - auto guard = std::lock_guard(output_lock); - auto before_expired = cluster->lock_resolver->resolveLocks(bo, start_ts, locks, pushed); + auto guard = std::lock_guard(*shared_context.output_lock); + auto before_expired = cluster->lock_resolver->resolveLocks(bo, shared_context.start_ts, locks, pushed); LOG_WARNING(log, "Finished resolve locks, before_expired={}", before_expired); throw Exception("lock error", ErrorCodes::COLUMNAR_SNAPSHOT_ERROR); } @@ -572,23 +865,320 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( throw Exception(ErrorCodes::COLUMNAR_SNAPSHOT_ERROR, "{}", error_msg); } - // Create input stream. - auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedReadThroughColumnar(table_scan); - BlockInputStreamPtr input_stream = RNProxyInputStream::create({ - .context = context, - .debug_tag = log->identifier(), - .columns_to_read = *column_defines, - .reader = columnar_reader, - .extra_table_id_index = extra_table_id_index, - .table_id = table_scan.getLogicalTableID(), - .executor_id = table_scan.getTableScanExecutorID(), + reader_returned = true; + return columnar_reader; +} + +// RNColumnarReadTask +RNColumnarReaderWork::~RNColumnarReaderWork() +{ + if (reader.has_value() && reader->inner.ptr != nullptr) + RustGcHelper::instance().gcRustPtr(reader->inner.ptr, reader->inner.type); +} + +RNColumnarReadTask::RNColumnarReadTask( + std::vector reader_plans, + size_t source_num_, + std::shared_ptr shared_reader_context_) + : reader_count(reader_plans.size()) + , source_num(source_num_) + , shared_reader_context(std::move(shared_reader_context_)) +{ + RUNTIME_CHECK(source_num > 0); + RUNTIME_CHECK(source_num <= reader_count, source_num, reader_count); + for (auto & reader_plan : reader_plans) + pending_reader_works.push_back(std::make_shared(std::move(reader_plan))); +} + +size_t RNColumnarReadTask::getReaderCount() const +{ + return reader_count; +} + +size_t RNColumnarReadTask::getSourceNum() const +{ + return source_num; +} + +const Context & RNColumnarReadTask::getContext() const +{ + return *shared_reader_context->context; +} + +const LoggerPtr & RNColumnarReadTask::getLog() const +{ + return shared_reader_context->log; +} + +const DM::ColumnDefines & RNColumnarReadTask::getColumnsToRead() const +{ + return *shared_reader_context->column_defines; +} + +int RNColumnarReadTask::getExtraTableIDIndex() const +{ + return shared_reader_context->extra_table_id_index; +} + +TableID RNColumnarReadTask::getLogicalTableID() const +{ + return shared_reader_context->logical_table_id; +} + +const String & RNColumnarReadTask::getExecutorID() const +{ + return shared_reader_context->executor_id; +} + +void RNColumnarReadTask::replaceReaderWork( + const RNColumnarReaderWorkPtr & reader_work, + std::vector replanned_reader_plans) +{ + RUNTIME_CHECK(reader_work != nullptr); + RUNTIME_CHECK(!replanned_reader_plans.empty()); + + reader_work->plan = std::move(replanned_reader_plans.front()); + if (replanned_reader_plans.size() == 1) + return; + + // If the original range now spans multiple regions, enqueue the remaining partitions for + // other sources. These ranges are produced by re-splitting the failed work's own key ranges. + auto queue_guard = std::lock_guard(pending_reader_works_mutex); + for (auto it = replanned_reader_plans.rbegin(); it != replanned_reader_plans.rend() - 1; ++it) + pending_reader_works.push_front(std::make_shared(*it)); +} + +#ifdef DBMS_PUBLIC_GTEST +void RNColumnarReadTask::replaceReaderWorkForTest( + const RNColumnarReaderWorkPtr & reader_work, + std::vector replanned_reader_plans) +{ + replaceReaderWork(reader_work, std::move(replanned_reader_plans)); +} +#endif + +ColumnarReaderPtr RNColumnarReadTask::createColumnarReaderWithBackoff(const RNColumnarReaderWorkPtr & reader_work) +{ + RUNTIME_CHECK(reader_work != nullptr); + pingcap::kv::Backoffer bo(pingcap::kv::copNextMaxBackoff); + while (true) + { + try + { + const auto & reader_plan = reader_work->plan; + LOG_INFO( + getLog(), + "materialize columnar reader for tables in region, region_id={}, table_num={}", + reader_plan.region_id, + reader_plan.physical_table_ranges.size()); + return createColumnarReader(*shared_reader_context, reader_plan); + } + catch (RegionException & e) + { + if (e.status == RegionException::RegionReadStatus::EPOCH_NOT_MATCH + || e.status == RegionException::RegionReadStatus::NOT_FOUND) + { + try + { + // Replan only the key ranges owned by this failed work. Dropping the stale + // region cache happens before this exception, so this locate pass can pick up + // the latest region epoch and split layout. + auto replanned_region_reader_plans = buildRegionReaderPlansFromPhysicalTableRanges( + getLog(), + getContext(), + reader_work->plan.physical_table_ranges); + auto replanned_reader_plans = buildReaderPlansFromRegionReaderPlans(replanned_region_reader_plans); + const auto replanned_reader_plan_count = replanned_reader_plans.size(); + replaceReaderWork(reader_work, std::move(replanned_reader_plans)); + LOG_WARNING( + getLog(), + "replanned columnar reader work after region error, old_error={}, new_region_id={}, " + "split_count={}", + e.message(), + reader_work->plan.region_id, + replanned_reader_plan_count); + } + catch (const std::exception & replan_e) + { + LOG_WARNING(getLog(), "replan columnar reader work failed, {}", replan_e.what()); + } + } + LOG_WARNING(getLog(), "create columnar reader failed, backoff and retry, {}", e.message()); + bo.backoff(pingcap::kv::boRegionMiss, pingcap::Exception(e.message(), e.code())); + } + catch (Exception & e) + { + if (e.code() != ErrorCodes::COLUMNAR_SNAPSHOT_ERROR) + throw; + LOG_WARNING(getLog(), "create proxy reader failed, backoff and retry, {}", e.message()); + bo.backoff(pingcap::kv::boRegionMiss, pingcap::Exception(e.message(), e.code())); + } + } +} + +ColumnarReaderPtr RNColumnarReadTask::getOrCreateReader(const RNColumnarReaderWorkPtr & reader_work) +{ + RUNTIME_CHECK(reader_work != nullptr); + + bool should_create_inline = false; + while (true) + { + { + std::unique_lock lock(reader_work->mutex); + switch (reader_work->state) + { + case RNColumnarReaderMaterializeState::Ready: + { + auto reader = std::move(reader_work->reader); + reader_work->reader.reset(); + reader_work->exception = nullptr; + reader_work->state = RNColumnarReaderMaterializeState::Consumed; + return reader.value(); + } + case RNColumnarReaderMaterializeState::Failed: + std::rethrow_exception(reader_work->exception); + case RNColumnarReaderMaterializeState::Consumed: + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "columnar reader work for region {} is already consumed", + reader_work->plan.region_id); + case RNColumnarReaderMaterializeState::Creating: + reader_work->cv.wait(lock, [&] { + return reader_work->state != RNColumnarReaderMaterializeState::Creating; + }); + continue; + case RNColumnarReaderMaterializeState::NotStarted: + reader_work->state = RNColumnarReaderMaterializeState::Creating; + should_create_inline = true; + break; + } + } + break; + } + + RUNTIME_CHECK(should_create_inline); + try + { + auto reader = createColumnarReaderWithBackoff(reader_work); + { + auto guard = std::lock_guard(reader_work->mutex); + reader_work->reader.reset(); + reader_work->exception = nullptr; + reader_work->state = RNColumnarReaderMaterializeState::Consumed; + } + reader_work->cv.notify_all(); + return reader; + } + catch (...) + { + { + auto guard = std::lock_guard(reader_work->mutex); + reader_work->reader.reset(); + reader_work->exception = std::current_exception(); + reader_work->state = RNColumnarReaderMaterializeState::Failed; + } + reader_work->cv.notify_all(); + throw; + } +} + +void RNColumnarReadTask::prefetchPendingWork() +{ + RNColumnarReaderWorkPtr reader_work; + { + auto guard = std::lock_guard(pending_reader_works_mutex); + if (pending_reader_works.empty()) + return; + reader_work = pending_reader_works.front(); + } + + prefetchReaderWork(reader_work); +} + +void RNColumnarReadTask::prefetchReaderWork(const RNColumnarReaderWorkPtr & reader_work) +{ + RUNTIME_CHECK(reader_work != nullptr); + + { + auto guard = std::lock_guard(reader_work->mutex); + if (reader_work->state != RNColumnarReaderMaterializeState::NotStarted) + return; + reader_work->state = RNColumnarReaderMaterializeState::Creating; + } + + LOG_INFO(getLog(), "materialize columnar reader asynchronously, region_id={}", reader_work->plan.region_id); + newThreadManager()->scheduleThenDetach(true, "PrefetchRNColumnarReader", [self = shared_from_this(), reader_work] { + try + { + auto reader = self->createColumnarReaderWithBackoff(reader_work); + { + auto guard = std::lock_guard(reader_work->mutex); + if (reader_work->state == RNColumnarReaderMaterializeState::Consumed) + return; + reader_work->reader.emplace(std::move(reader)); + reader_work->exception = nullptr; + reader_work->state = RNColumnarReaderMaterializeState::Ready; + } + } + catch (...) + { + { + auto guard = std::lock_guard(reader_work->mutex); + if (reader_work->state == RNColumnarReaderMaterializeState::Consumed) + return; + reader_work->reader.reset(); + reader_work->exception = std::current_exception(); + reader_work->state = RNColumnarReaderMaterializeState::Failed; + } + } + reader_work->cv.notify_all(); + }); +} + +std::optional RNColumnarReadTask::tryAcquireReaderWork() +{ + RNColumnarReaderWorkPtr reader_work; + { + auto guard = std::lock_guard(pending_reader_works_mutex); + if (pending_reader_works.empty()) + return std::nullopt; + reader_work = pending_reader_works.front(); + pending_reader_works.pop_front(); + } + prefetchPendingWork(); + return reader_work; +} + +BlockInputStreamPtr RNColumnarReadTask::createInputStream(const RNColumnarReaderWorkPtr & reader_work) +{ + RUNTIME_CHECK(reader_work != nullptr); + return RNColumnarInputStream::create({ + .context = getContext(), + .log = getLog(), + .task = shared_from_this(), + .reader_work = reader_work, + .columns_to_read = getColumnsToRead(), + .extra_table_id_index = getExtraTableIDIndex(), + .table_id = getLogicalTableID(), + .executor_id = getExecutorID(), + }); +} + +BlockInputStreamPtr RNColumnarReadTask::createSharedInputStream() +{ + return RNColumnarInputStream::create({ + .context = getContext(), + .log = getLog(), + .task = shared_from_this(), + .reader_work = nullptr, + .columns_to_read = getColumnsToRead(), + .extra_table_id_index = getExtraTableIDIndex(), + .table_id = getLogicalTableID(), + .executor_id = getExecutorID(), }); - reader_transferred = true; - return std::make_shared(input_stream); } -// RNProxyReadTask -std::vector RNProxyReadTask::buildProxyReadTaskWithBackoff( +std::vector RNColumnarReadTask::buildColumnarReadTaskWithBackoff( const LoggerPtr & log, const Context & context, UInt64 start_ts, @@ -597,13 +1187,13 @@ std::vector RNProxyReadTask::buildProxyReadTaskWithBackoff( const std::vector & remote_table_ranges, unsigned num_streams) { - std::vector tasks; + std::vector tasks; pingcap::kv::Backoffer bo(pingcap::kv::copNextMaxBackoff); while (true) { try { - tasks = RNProxyReadTask::buildProxyReadTask( + tasks = RNColumnarReadTask::buildColumnarReadTask( log, context, start_ts, @@ -615,21 +1205,21 @@ std::vector RNProxyReadTask::buildProxyReadTaskWithBackoff( } catch (RegionException & e) { - LOG_WARNING(log, "buildProxyReadTask failed, backoff and retry, {}", e.message()); + LOG_WARNING(log, "buildColumnarReadTask failed, backoff and retry, {}", e.message()); bo.backoff(pingcap::kv::boRegionMiss, pingcap::Exception(e.message(), e.code())); } catch (Exception & e) { if (e.code() != ErrorCodes::COLUMNAR_SNAPSHOT_ERROR) throw; - LOG_WARNING(log, "buildProxyReadTask failed, backoff and retry, {}", e.message()); + LOG_WARNING(log, "buildColumnarReadTask failed, backoff and retry, {}", e.message()); bo.backoff(pingcap::kv::boRegionMiss, pingcap::Exception(e.message(), e.code())); } } return tasks; } -std::vector RNProxyReadTask::buildProxyReadTask( +std::vector RNColumnarReadTask::buildColumnarReadTask( const LoggerPtr & log, const Context & context, UInt64 start_ts, @@ -642,168 +1232,142 @@ std::vector RNProxyReadTask::buildProxyReadTask( auto scan_context = std::make_shared(dag_context->getKeyspaceID(), dag_context->getResourceGroupName()); dag_context->scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; + auto shared_reader_context + = buildColumnarReaderSharedContext(log, context, start_ts, table_scan, filter_conditions); - std::vector tasks; - // Collect all regions in the table scan. - std::unordered_map>> - all_remote_regions_by_region; - std::unordered_map region_ver_ids; - - std::vector physical_table_ids; - std::vector ranges_for_each_physical_table; - physical_table_ids.reserve(remote_table_ranges.size()); - ranges_for_each_physical_table.reserve(remote_table_ranges.size()); + std::vector tasks; + ColumnarPhysicalTableRanges physical_table_ranges; + physical_table_ranges.reserve(remote_table_ranges.size()); for (const auto & remote_table_range : remote_table_ranges) + physical_table_ranges.emplace_back(remote_table_range.first, remote_table_range.second); + + auto region_reader_plans = buildRegionReaderPlansFromPhysicalTableRanges(log, context, physical_table_ranges); + const auto region_num = static_cast(region_reader_plans.size()); + const auto physical_table_num = static_cast(physical_table_ranges.size()); + const bool enable_bucket_parallel = !table_scan.keepOrder() && num_streams > region_num; + size_t total_max_reader_num = region_num; + for (auto & plan : region_reader_plans) { - physical_table_ids.emplace_back(remote_table_range.first); - ranges_for_each_physical_table.emplace_back(remote_table_range.second); - } - pingcap::kv::Cluster * cluster = context.getTMTContext().getKVCluster(); - pingcap::kv::Backoffer bo(pingcap::kv::copBuildTaskMaxBackoff); - auto & region_cache = cluster->region_cache; - for (auto idx = 0; idx < static_cast(ranges_for_each_physical_table.size()); idx++) - { - const auto physical_table_id = physical_table_ids[idx]; - const auto ranges = ranges_for_each_physical_table[idx]; - const auto locations = pingcap::coprocessor::details::splitKeyRangesByLocations(region_cache, bo, ranges); - for (const auto & location : locations) + if (enable_bucket_parallel) { - // If the region_ver_ids already exists, compare the value with location.location.region. - // If the value is not equal, drop cache and retry. - const auto & region = location.location.region; - if (auto it = region_ver_ids.find(region.id); it != region_ver_ids.end() && it->second != region) + auto bucket_keys = getRegionBucketKeysFromColumnar(context, plan.region_id, plan.region_ver_id.ver); + auto split_result = splitRangesByBucketKeys(plan.physical_table_ranges, bucket_keys); + if (split_result.has_bucket_split && split_result.units.size() > 1) { - region_cache->dropRegion(it->second); - region_cache->dropRegion(region); - region_ver_ids.erase(it); - LOG_WARNING( - log, - "buildProxyReadTask failed region_id={}, epoch not match {}", - region.id, - region.toString()); - throw RegionException( - RegionException::UnavailableRegions{region.id}, - RegionException::RegionReadStatus::EPOCH_NOT_MATCH, - region.toString().c_str()); + total_max_reader_num += split_result.units.size() - 1; + plan.bucket_units = std::move(split_result.units); } - all_remote_regions_by_region[region.id].push_back(std::make_tuple(physical_table_id, location.ranges)); - region_ver_ids[region.id] = region; - LOG_DEBUG( - log, - "buildProxyReadTask, physical_table_id={}, region_ver_id={}", - physical_table_id, - region.toString()); } } - unsigned region_num = all_remote_regions_by_region.size(); - unsigned physical_table_num = physical_table_ids.size(); - unsigned real_num_streams = std::min(num_streams, region_num); - // Regions per RNProxyReader, it should be ceil of region_num / real_num_streams. - // `regions_per_reader` is the ceil of the division, so the concurrency may be less than `real_num_streams`. - unsigned regions_per_reader = (region_num + real_num_streams - 1) / real_num_streams; LOG_INFO( log, - "region_num={}, table_num={}, num_streams={}, real_num_streams={}, regions_per_reader={}", + "region_num={}, table_num={}, num_streams={}, keep_order={}, bucket_parallel={}, planned_reader_num={}", region_num, physical_table_num, num_streams, - real_num_streams, - regions_per_reader); - unsigned reader_idx = 0; - std::vector all_readers; - std::mutex output_lock; - auto thread_manager = newThreadManager(); - - for (const auto & [region_id, physical_table_ranges] : all_remote_regions_by_region) - { - auto region_ver = region_ver_ids[region_id].ver; - auto region_conf_ver = region_ver_ids[region_id].conf_ver; - thread_manager->schedule( - true, - "createProxyReader", - [log, - &context, - region_id, - region_ver, - region_conf_ver, - physical_table_ranges, - start_ts, - &table_scan, - &filter_conditions, - &output_lock, - &all_readers] { - LOG_INFO( - log, - "create proxy reader for tables in region, region_id={}, table_num={}", - region_id, - physical_table_ranges.size()); - auto reader_ptr = RNProxyReader::createProxyReader( - log, - context, - region_id, - region_ver, - region_conf_ver, - physical_table_ranges, - start_ts, - table_scan, - filter_conditions, - output_lock); - { - std::lock_guard lock(output_lock); - all_readers.push_back(reader_ptr); - } - }); - } + table_scan.keepOrder(), + enable_bucket_parallel, + total_max_reader_num); - thread_manager->wait(); + std::vector all_reader_plans; + all_reader_plans.reserve(total_max_reader_num); - std::vector readers; - for (auto & reader : all_readers) + for (const auto & plan : region_reader_plans) { - ++reader_idx; - readers.push_back(reader); - if (reader_idx == regions_per_reader) + if (plan.bucket_units.empty()) { - reader_idx = 0; - tasks.push_back(std::make_shared(std::move(readers))); - readers.clear(); + all_reader_plans.push_back(RNColumnarReaderPlan{ + .region_id = plan.region_id, + .region_ver = plan.region_ver_id.ver, + .region_conf_ver = plan.region_ver_id.conf_ver, + .physical_table_ranges = plan.physical_table_ranges, + }); + } + else + { + for (const auto & [table_id, range] : plan.bucket_units) + { + all_reader_plans.push_back(RNColumnarReaderPlan{ + .region_id = plan.region_id, + .region_ver = plan.region_ver_id.ver, + .region_conf_ver = plan.region_ver_id.conf_ver, + .physical_table_ranges + = ColumnarPhysicalTableRanges{std::make_tuple(table_id, pingcap::coprocessor::KeyRanges{range})}, + }); + } } } - if (!readers.empty()) - { - tasks.push_back(std::make_shared(std::move(readers))); - } - + if (all_reader_plans.empty()) + return tasks; + tasks.push_back(std::make_shared( + std::move(all_reader_plans), + getRNColumnarSourceNum(num_streams, total_max_reader_num), + shared_reader_context)); return tasks; } -BlockInputStreams RNProxyReadTask::getInputStreams() const +BlockInputStreams RNColumnarReadTask::getInputStreams() { BlockInputStreams streams; - streams.reserve(proxy_readers.size()); - for (const auto & reader : proxy_readers) + streams.reserve(source_num); + for (size_t worker_index = 0; worker_index < source_num; ++worker_index) { - streams.push_back(reader->getInputStream()); + streams.push_back(createSharedInputStream()); } return streams; } -// RNProxyInputStream -RNProxyInputStream::~RNProxyInputStream() +// RNColumnarInputStream +bool RNColumnarInputStream::ensureReader() { - SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(reader.inner.ptr, reader.inner.type); }); + if (reader.has_value()) + return true; + + if (fixed_reader_work != nullptr) + { + current_reader_work = fixed_reader_work; + reader.emplace(task->getOrCreateReader(fixed_reader_work)); + return true; + } + + auto next_reader_work = task->tryAcquireReaderWork(); + if (!next_reader_work.has_value()) + return false; + + current_reader_work = next_reader_work.value(); + reader.emplace(task->getOrCreateReader(next_reader_work.value())); + return true; +} + +void RNColumnarInputStream::releaseReader() +{ + if (reader.has_value() && reader->inner.ptr != nullptr) + RustGcHelper::instance().gcRustPtr(reader->inner.ptr, reader->inner.type); + reader.reset(); + current_reader_work.reset(); +} + +RNColumnarInputStream::~RNColumnarInputStream() +{ + SCOPE_EXIT({ + if (reader.has_value() && reader->inner.ptr != nullptr) + RustGcHelper::instance().gcRustPtr(reader->inner.ptr, reader->inner.type); + }); try { + const auto * dag_context = context.getDAGContext(); + const auto keyspace_id = dag_context != nullptr ? dag_context->getKeyspaceID() : NullspaceID; LOG_INFO( log, - "Finished reading remote snapshot through proxy, rows={} bytes={} read_cost={:.3f}s " + "Finished reading remote snapshot through columnar, keyspace_id={} rows={} bytes={} read_cost={:.3f}s " "deserialize_cost={:.3f}s", + keyspace_id, action.totalRows(), total_bytes, duration_read_sec, duration_deserialize_sec); - if (auto * dag_context = context.getDAGContext(); dag_context != nullptr) + if (dag_context != nullptr) { if (auto it = dag_context->scan_context_map.find(executor_id); it != dag_context->scan_context_map.end()) { @@ -821,117 +1385,156 @@ RNProxyInputStream::~RNProxyInputStream() } } -Block RNProxyInputStream::read(FilterPtr & res_filter, bool return_filter) +Block RNColumnarInputStream::read(FilterPtr & res_filter, bool return_filter) { return readImpl(res_filter, return_filter); } -Block RNProxyInputStream::readImpl() +Block RNColumnarInputStream::readImpl() { FilterPtr filter_ignored; return readImpl(filter_ignored, false); } -Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[maybe_unused]] bool return_filter) +Block RNColumnarInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[maybe_unused]] bool return_filter) { if (done) return {}; const Context & global_ctx = context.getGlobalContext(); const TiFlashRaftProxyHelper * proxy_helper = global_ctx.getSharedContextDisagg()->getColumnarProxyHelper(); - RUNTIME_CHECK_MSG(proxy_helper != nullptr, "columnar proxy helper is not initialized"); - Stopwatch w{CLOCK_MONOTONIC_COARSE}; - UInt64 rows = proxy_helper->cloud_storage_engine_interfaces.fn_read_block(reader, batch_size); - duration_read_sec += w.elapsedSecondsFromLastTime(); - LOG_DEBUG(log, "Read {} rows from proxy", rows); - if (rows == std::numeric_limits::max()) - { - LOG_WARNING(log, "Read block from proxy failed"); - throw Exception("read_block failed in tiflash-proxy", ErrorCodes::LOGICAL_ERROR); - } - if (rows == 0) - return {}; + RUNTIME_CHECK_MSG(proxy_helper != nullptr, "columnar helper is not initialized"); - TableID physical_table_id = -1; - Block header = getHeader(); - const ColumnsWithTypeAndName & col_type_and_name = header.getColumnsWithTypeAndName(); - // Construct block from proxy column data. - MutableColumns columns = header.cloneEmptyColumns(); - for (UInt32 i = 0; i < col_type_and_name.size(); ++i) + while (true) { - LOG_DEBUG( - log, - "Read column id={} name={} type={}", - col_type_and_name[i].column_id, - col_type_and_name[i].name, - col_type_and_name[i].type->getName()); - // Read column data from proxy - Int64 col_id = col_type_and_name[i].column_id; - if (col_id == MutSup::extra_handle_id) - { - RustStrWithView col_data = proxy_helper->cloud_storage_engine_interfaces.fn_read_handle(reader); - SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(col_data.inner.ptr, col_data.inner.type); }); - physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader); - ReadBufferFromMemory buf(col_data.buff.data, static_cast(col_data.buff.len)); - auto & col = *columns[i]; - col_type_and_name[i].type->deserializeBinaryBulkWithMultipleStreams( - col, - [&](const IDataType::SubstreamPath &) { return &buf; }, - rows, - -1.0, // avg_value_size_hint set to -1 to indicate Decimal format from proxy - true, - {}); - } - else if (col_id == MutSup::extra_table_id_col_id) + if (!ensureReader()) + { + done = true; + return {}; + } + + Stopwatch w{CLOCK_MONOTONIC_COARSE}; + UInt64 rows = proxy_helper->cloud_storage_engine_interfaces.fn_read_block(reader.value(), batch_size); + duration_read_sec += w.elapsedSecondsFromLastTime(); + LOG_DEBUG(log, "Read {} rows from columnar", rows); + if (rows == std::numeric_limits::max()) { + LOG_WARNING(log, "Read block from columnar failed"); + throw Exception("read_block failed in columnar", ErrorCodes::LOGICAL_ERROR); + } + if (rows == 0) + { + releaseReader(); + if (fixed_reader_work != nullptr) + { + done = true; + return {}; + } continue; } - else + + TableID physical_table_id = -1; + Block header = getHeader(); + const ColumnsWithTypeAndName & col_type_and_name = header.getColumnsWithTypeAndName(); + // Construct block from columnar column data. + MutableColumns columns = header.cloneEmptyColumns(); + for (UInt32 i = 0; i < col_type_and_name.size(); ++i) { - RustStrWithView col_data = proxy_helper->cloud_storage_engine_interfaces.fn_read_column(reader, col_id); - SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(col_data.inner.ptr, col_data.inner.type); }); - physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader); - ReadBufferFromMemory buf(col_data.buff.data, static_cast(col_data.buff.len)); - auto & col = *columns[i]; - col_type_and_name[i].type->deserializeBinaryBulkWithMultipleStreams( - col, - [&](const IDataType::SubstreamPath &) { return &buf; }, - rows, - -1.0, // avg_value_size_hint set to -1 to indicate Decimal format from proxy - true, - {}); - LOG_DEBUG(log, "Read column data done, col size={}", col.size()); + LOG_DEBUG( + log, + "Read column id={} name={} type={}", + col_type_and_name[i].column_id, + col_type_and_name[i].name, + col_type_and_name[i].type->getName()); + // Read column data from columnar + Int64 col_id = col_type_and_name[i].column_id; + if (col_id == MutSup::extra_handle_id) + { + RustStrWithView col_data = proxy_helper->cloud_storage_engine_interfaces.fn_read_handle(reader.value()); + SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(col_data.inner.ptr, col_data.inner.type); }); + physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader.value()); + ReadBufferFromMemory buf(col_data.buff.data, static_cast(col_data.buff.len)); + auto & col = *columns[i]; + col_type_and_name[i].type->deserializeBinaryBulkWithMultipleStreams( + col, + [&](const IDataType::SubstreamPath &) { return &buf; }, + rows, + -1.0, // avg_value_size_hint set to -1 to indicate Decimal format from columnar + true, + {}); + } + else if (col_id == MutSup::extra_table_id_col_id) + { + continue; + } + else + { + RustStrWithView col_data + = proxy_helper->cloud_storage_engine_interfaces.fn_read_column(reader.value(), col_id); + SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(col_data.inner.ptr, col_data.inner.type); }); + physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader.value()); + ReadBufferFromMemory buf(col_data.buff.data, static_cast(col_data.buff.len)); + auto & col = *columns[i]; + col_type_and_name[i].type->deserializeBinaryBulkWithMultipleStreams( + col, + [&](const IDataType::SubstreamPath &) { return &buf; }, + rows, + -1.0, // avg_value_size_hint set to -1 to indicate Decimal format from columnar + true, + {}); + LOG_DEBUG(log, "Read column data done, col size={}", col.size()); + } } - } - duration_deserialize_sec += w.elapsedSecondsFromLastTime(); + duration_deserialize_sec += w.elapsedSecondsFromLastTime(); - Block block = header.cloneWithColumns(std::move(columns)); - LOG_DEBUG(log, "Read block rows={}, structure={}", block.rows(), block.dumpStructure()); - if (physical_table_id == -1) - { - LOG_WARNING(log, "physical_table_id is not set, use table_id {} instead", table_id); - physical_table_id = table_id; - } - // Fill extra table id column. - action.fill(block, physical_table_id); - block.checkNumberOfRows(); + Block block = header.cloneWithColumns(std::move(columns)); + LOG_DEBUG(log, "Read block rows={}, structure={}", block.rows(), block.dumpStructure()); + if (physical_table_id == -1) + { + LOG_WARNING(log, "physical_table_id is not set, use table_id {} instead", table_id); + physical_table_id = table_id; + } + // Fill extra table id column. + action.fill(block, physical_table_id); + block.checkNumberOfRows(); - total_bytes += block.bytes(); - return block; + total_bytes += block.bytes(); + return block; + } } -// RNProxySourceOp -void RNProxySourceOp::operateSuffixImpl() +// RNColumnarSourceOp +void RNColumnarSourceOp::operateSuffixImpl() { UNUSED(context); - LOG_INFO(log, "Finished reading proxy snapshots, rows={} cost={:.3f}s", total_rows, duration_read_sec); + const auto keyspace_id = exec_context.getKeyspaceID(); + const double total_cost_sec = total_cost_watch.elapsedSeconds(); + const UInt64 rows_per_sec + = total_cost_sec > 0 ? static_cast(static_cast(total_rows) / total_cost_sec) : 0; + const UInt64 bytes_per_sec + = total_cost_sec > 0 ? static_cast(static_cast(total_bytes) / total_cost_sec) : 0; + LOG_INFO( + log, + "Finished reading columnar snapshots, keyspace_id={} task_pool_worker_total_cost={:.3f}s claimed_streams={} " + "rows={} " + "rows_per_sec={} " + "bytes={} bytes_per_sec={} read_cost={:.3f}s", + keyspace_id, + total_cost_sec, + total_streams, + total_rows, + rows_per_sec, + total_bytes, + bytes_per_sec, + duration_read_sec); } -void RNProxySourceOp::operatePrefixImpl() +void RNColumnarSourceOp::operatePrefixImpl() { - LOG_INFO(log, "Begin reading proxy snapshots"); + total_cost_watch.restart(); + LOG_INFO(log, "Begin reading columnar snapshots, keyspace_id={}", exec_context.getKeyspaceID()); } -OperatorStatus RNProxySourceOp::readImpl(Block & block) +OperatorStatus RNColumnarSourceOp::readImpl(Block & block) { if (unlikely(done)) { @@ -946,57 +1549,52 @@ OperatorStatus RNProxySourceOp::readImpl(Block & block) return OperatorStatus::HAS_OUTPUT; } - return current_reader_idx < 0 ? OperatorStatus::IO_IN : awaitImpl(); + return awaitImpl(); } -OperatorStatus RNProxySourceOp::awaitImpl() +OperatorStatus RNColumnarSourceOp::awaitImpl() { if (unlikely(done || t_block.has_value())) { return OperatorStatus::HAS_OUTPUT; } - if (unlikely(current_reader_idx < 0)) - { - current_reader_idx = 0; - } - return OperatorStatus::IO_IN; } -OperatorStatus RNProxySourceOp::executeIOImpl() +OperatorStatus RNColumnarSourceOp::executeIOImpl() { if (unlikely(done || t_block.has_value())) { return OperatorStatus::HAS_OUTPUT; } - if (unlikely(current_reader_idx < 0)) + if (!current_input_stream) { - return awaitImpl(); + auto next_reader_work = task->tryAcquireReaderWork(); + if (!next_reader_work.has_value()) + { + done = true; + return OperatorStatus::HAS_OUTPUT; + } + current_input_stream = task->createInputStream(next_reader_work.value()); + ++total_streams; } FilterPtr filter_ignored = nullptr; Stopwatch w{CLOCK_MONOTONIC_COARSE}; - Block block = task->getProxyReaders()[current_reader_idx]->getInputStream()->read(filter_ignored, false); + Block block = current_input_stream->read(filter_ignored, false); duration_read_sec += w.elapsedSeconds(); if likely (block && block.rows() > 0) { total_rows += block.rows(); + total_bytes += block.bytes(); t_block.emplace(std::move(block)); return OperatorStatus::HAS_OUTPUT; } else { - if (current_reader_idx == static_cast(task->getProxyReaders().size() - 1)) - { - done = true; - } - else if (current_reader_idx < static_cast(task->getProxyReaders().size() - 1)) - { - ++current_reader_idx; - } - // Current stream is drained, try to read from next stream. + current_input_stream.reset(); return awaitImpl(); } } diff --git a/dbms/src/Storages/StorageDisaggregatedColumnar.h b/dbms/src/Storages/StorageDisaggregatedColumnar.h index fb1e0094b16..113193510b7 100644 --- a/dbms/src/Storages/StorageDisaggregatedColumnar.h +++ b/dbms/src/Storages/StorageDisaggregatedColumnar.h @@ -18,6 +18,7 @@ #if ENABLE_NEXT_GEN_COLUMNAR #include #include +#include #include #include #include @@ -35,6 +36,12 @@ #include #include +#include +#include +#include +#include +#include +#include #include #pragma GCC diagnostic pop @@ -48,51 +55,53 @@ class RSOperator; using RSOperatorPtr = std::shared_ptr; } // namespace DM -class RNProxyReader; -using RNProxyReaderPtr = std::shared_ptr; -class RNProxyReader : boost::noncopyable +enum class RNColumnarReaderMaterializeState { -public: - static RNProxyReaderPtr createProxyReader( - const LoggerPtr & log, - const Context & context, - RegionID region_id, - RegionVersion region_ver, - UInt64 region_conf_ver, - const std::vector> & physical_table_ranges, - UInt64 start_ts, - const TiDBTableScan & table_scan, - const FilterConditions & filter_conditions, - std::mutex & output_lock); + NotStarted, + Creating, + Ready, + Failed, + Consumed, +}; - BlockInputStreamPtr getInputStream() const - { - RUNTIME_CHECK(input_stream != nullptr); - return input_stream; - } +struct RNColumnarReaderSharedContext; + +struct RNColumnarReaderPlan +{ + RegionID region_id; + RegionVersion region_ver; + UInt64 region_conf_ver; + std::vector> physical_table_ranges; +}; - RNProxyReader(BlockInputStreamPtr input_stream) - : input_stream(input_stream) +struct RNColumnarReaderWork +{ + explicit RNColumnarReaderWork(RNColumnarReaderPlan plan_) + : plan(std::move(plan_)) {} -private: - BlockInputStreamPtr input_stream; + ~RNColumnarReaderWork(); + + RNColumnarReaderPlan plan; + std::mutex mutex; + std::condition_variable cv; + RNColumnarReaderMaterializeState state = RNColumnarReaderMaterializeState::NotStarted; + std::optional reader; + std::exception_ptr exception; }; -class RNProxyReadTask; -using RNProxyReadTaskPtr = std::shared_ptr; -class RNProxyReadTask : boost::noncopyable +using RNColumnarReaderWorkPtr = std::shared_ptr; + +class RNColumnarReadTask; +using RNColumnarReadTaskPtr = std::shared_ptr; +class RNColumnarReadTask + : public boost::noncopyable + , public std::enable_shared_from_this { public: using RemoteTableRange = std::pair; - const std::vector proxy_readers; - static RNProxyReadTaskPtr create(const std::vector & proxy_readers) - { - return std::shared_ptr(new RNProxyReadTask(proxy_readers)); - } - - static std::vector buildProxyReadTaskWithBackoff( + static std::vector buildColumnarReadTaskWithBackoff( const LoggerPtr & log, const Context & context, UInt64 start_ts, @@ -101,7 +110,7 @@ class RNProxyReadTask : boost::noncopyable const std::vector & remote_table_ranges, unsigned num_streams); - static std::vector buildProxyReadTask( + static std::vector buildColumnarReadTask( const LoggerPtr & log, const Context & context, UInt64 start_ts, @@ -110,21 +119,67 @@ class RNProxyReadTask : boost::noncopyable const std::vector & remote_table_ranges, unsigned num_streams); - BlockInputStreams getInputStreams() const; + BlockInputStreams getInputStreams(); - std::vector getProxyReaders() { return proxy_readers; } + BlockInputStreamPtr createSharedInputStream(); - RNProxyReadTask(const std::vector & proxy_readers) - : proxy_readers(proxy_readers) - {} + BlockInputStreamPtr createInputStream(const RNColumnarReaderWorkPtr & reader_work); + + ColumnarReaderPtr createColumnarReaderWithBackoff(const RNColumnarReaderWorkPtr & reader_work); + + ColumnarReaderPtr getOrCreateReader(const RNColumnarReaderWorkPtr & reader_work); + + std::optional tryAcquireReaderWork(); + +#ifdef DBMS_PUBLIC_GTEST + void replaceReaderWorkForTest( + const RNColumnarReaderWorkPtr & reader_work, + std::vector replanned_reader_plans); +#endif + + size_t getReaderCount() const; + + size_t getSourceNum() const; + + const Context & getContext() const; + + const LoggerPtr & getLog() const; + + const DM::ColumnDefines & getColumnsToRead() const; + + int getExtraTableIDIndex() const; + + TableID getLogicalTableID() const; + + const String & getExecutorID() const; + + RNColumnarReadTask( + std::vector reader_plans, + size_t source_num, + std::shared_ptr shared_reader_context); + +private: + void prefetchPendingWork(); + + void prefetchReaderWork(const RNColumnarReaderWorkPtr & reader_work); + + void replaceReaderWork( + const RNColumnarReaderWorkPtr & reader_work, + std::vector replanned_reader_plans); + + size_t reader_count; + size_t source_num; + std::shared_ptr shared_reader_context; + mutable std::mutex pending_reader_works_mutex; + std::deque pending_reader_works; }; -class RNProxyInputStream : public IProfilingBlockInputStream +class RNColumnarInputStream : public IProfilingBlockInputStream { static constexpr auto NAME = "RNProxy"; public: - ~RNProxyInputStream(); + ~RNColumnarInputStream(); String getName() const { return NAME; } Block getHeader() const { return header; } @@ -139,18 +194,20 @@ class RNProxyInputStream : public IProfilingBlockInputStream struct Options { const Context & context; - std::string_view debug_tag; + LoggerPtr log; + RNColumnarReadTaskPtr task; + RNColumnarReaderWorkPtr reader_work; const DM::ColumnDefines & columns_to_read; - ColumnarReaderPtr reader; int extra_table_id_index; TableID table_id; const String & executor_id; }; - explicit RNProxyInputStream(const Options & options) + explicit RNColumnarInputStream(const Options & options) : context(options.context) - , log(Logger::get(options.debug_tag)) - , reader(options.reader) + , log(options.log) + , task(options.task) + , fixed_reader_work(options.reader_work) , action(options.columns_to_read, options.extra_table_id_index) , table_id(options.table_id) , executor_id(options.executor_id) @@ -159,12 +216,21 @@ class RNProxyInputStream : public IProfilingBlockInputStream setHeader(action.getHeader()); } - static BlockInputStreamPtr create(const Options & options) { return std::make_shared(options); } + static BlockInputStreamPtr create(const Options & options) + { + return std::make_shared(options); + } private: + bool ensureReader(); + void releaseReader(); + const Context & context; const LoggerPtr log; - ColumnarReaderPtr reader; + RNColumnarReadTaskPtr task; + const RNColumnarReaderWorkPtr fixed_reader_work; + RNColumnarReaderWorkPtr current_reader_work; + std::optional reader; AddExtraTableIDColumnTransformAction action; TableID table_id; const String executor_id; @@ -178,33 +244,29 @@ class RNProxyInputStream : public IProfilingBlockInputStream UInt64 total_bytes = 0; }; -class RNProxySourceOp : public SourceOp +class RNColumnarSourceOp : public SourceOp { static constexpr auto NAME = "RNProxy"; public: struct Options { - const Context & context; - std::string_view debug_tag; PipelineExecutorContext & exec_context; - const DM::ColumnDefines & columns_to_read; - RNProxyReadTaskPtr task; - int extra_table_id_index; + RNColumnarReadTaskPtr task; }; - explicit RNProxySourceOp(const Options & options) - : SourceOp(options.exec_context, String(options.debug_tag)) - , context(options.context) - , log(Logger::get(options.debug_tag)) + explicit RNColumnarSourceOp(const Options & options) + : SourceOp(options.exec_context, options.task->getLog()->identifier()) + , context(options.task->getContext()) + , log(options.task->getLog()) , task(options.task) - , action(options.columns_to_read, options.extra_table_id_index) { - // Keep header aligned with genNamesAndTypesForTableScan when TiDB requests _tidb_tid on partition scans. - setHeader(action.getHeader()); + setHeader(AddExtraTableIDColumnTransformAction::buildHeader( + options.task->getColumnsToRead(), + options.task->getExtraTableIDIndex())); } - static SourceOpPtr create(const Options & options) { return std::make_unique(options); } + static SourceOpPtr create(const Options & options) { return std::make_unique(options); } String getName() const override { return NAME; } @@ -224,11 +286,12 @@ class RNProxySourceOp : public SourceOp private: const Context & context; const LoggerPtr log; - RNProxyReadTaskPtr task; - AddExtraTableIDColumnTransformAction action; + RNColumnarReadTaskPtr task; + UInt64 total_bytes = 0; size_t total_rows = 0; + size_t total_streams = 0; - Int32 current_reader_idx = -1; + BlockInputStreamPtr current_input_stream; // Temporarily store the block read from current_seg_task->stream and pass it to downstream operators in readImpl. std::optional t_block = std::nullopt; @@ -236,7 +299,7 @@ class RNProxySourceOp : public SourceOp bool done = false; // Count the time spent waiting for segment tasks to be ready. //double duration_wait_ready_task_sec = 0; - Stopwatch wait_stop_watch{CLOCK_MONOTONIC_COARSE}; + Stopwatch total_cost_watch{CLOCK_MONOTONIC_COARSE}; // Count the time consumed by reading blocks in the stream of segment tasks. double duration_read_sec = 0;