From e6bc792fec9e7c08ee2ee05c15fb53eaff53b0ba Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 16 May 2026 18:06:35 +0200 Subject: [PATCH 1/2] add scanner fetch and remote download metrics --- crates/fluss/src/client/table/remote_log.rs | 13 ++ crates/fluss/src/client/table/scanner.rs | 149 ++++++++++++++++++-- crates/fluss/src/metrics.rs | 93 ++++++++++++ 3 files changed, 247 insertions(+), 8 deletions(-) diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 4d96ce96..8957aced 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -18,6 +18,10 @@ use crate::client::credentials::CredentialsReceiver; use crate::error::{Error, Result}; use crate::io::{FileIO, Storage}; use crate::metadata::TableBucket; +use crate::metrics::{ + SCANNER_REMOTE_FETCH_BYTES_TOTAL, SCANNER_REMOTE_FETCH_ERRORS_TOTAL, + SCANNER_REMOTE_FETCH_REQUESTS_TOTAL, +}; use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; use futures::TryStreamExt; use parking_lot::Mutex; @@ -494,12 +498,19 @@ async fn spawn_download_task( return DownloadResult::Cancelled; } + // Java reference: RemoteLogDownloader.java increments `remoteFetchRequestCount` + // immediately before initiating the download. Each retry of the same segment + // counts as a separate request (matches Java behavior). + metrics::counter!(SCANNER_REMOTE_FETCH_REQUESTS_TOTAL).increment(1); + // Try download ONCE let download_result = fetcher.fetch(&request).await; match download_result { Ok(fetch_result) => { // Success - permit will be released on drop (FileSource handles file deletion) + metrics::counter!(SCANNER_REMOTE_FETCH_BYTES_TOTAL) + .increment(fetch_result.file_size as u64); DownloadResult::Success { result: RemoteLogFile { file_path: fetch_result.file_path, @@ -516,6 +527,8 @@ async fn spawn_download_task( } Err(e) => { // Download failed - check if we should retry or give up + // Counted per attempt, so retries each contribute one error. + metrics::counter!(SCANNER_REMOTE_FETCH_ERRORS_TOTAL).increment(1); let retry_count = request.retry_count + 1; if retry_count > MAX_RETRY_COUNT { diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index a4164b99..7013ba50 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -27,6 +27,9 @@ use crate::config::Config; use crate::error::Error::UnsupportedOperation; use crate::error::{ApiError, Error, FlussError, Result}; use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath}; +use crate::metrics::{ + SCANNER_BYTES_PER_REQUEST, SCANNER_FETCH_LATENCY_MS, SCANNER_FETCH_REQUESTS_TOTAL, +}; use crate::proto::{ ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable, }; @@ -39,6 +42,7 @@ use crate::{PartitionId, TableId}; use arrow_schema::SchemaRef; use log::{debug, warn}; use parking_lot::{Mutex, RwLock}; +use prost::Message; use std::{ collections::{HashMap, HashSet}, slice::from_ref, @@ -779,6 +783,9 @@ struct FetchResponseContext { read_context: ReadContext, remote_read_context: ReadContext, remote_log_downloader: Arc, + /// `Instant` captured immediately before the FetchLog RPC; used to compute + /// `scanner.fetch_latency_ms` on a successful response. + request_start_time: Instant, } impl LogFetcher { @@ -1021,14 +1028,6 @@ impl LogFetcher { let remote_log_downloader = Arc::clone(&self.remote_log_downloader); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); let metadata = self.metadata.clone(); - let response_context = FetchResponseContext { - metadata: metadata.clone(), - log_fetch_buffer, - log_scanner_status, - read_context, - remote_read_context, - remote_log_downloader, - }; // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. // This is acceptable because: @@ -1060,6 +1059,11 @@ impl LogFetcher { } }; + // Java increment the fetch counter and capture `requestStartTime` immediately + // before the RPC. Failed connection acquisition above is not counted. + let request_start_time = Instant::now(); + metrics::counter!(SCANNER_FETCH_REQUESTS_TOTAL).increment(1); + let fetch_response = match con .request(message::FetchLogRequest::new(fetch_request.clone())) .await @@ -1074,6 +1078,18 @@ impl LogFetcher { } }; + // Build the context after the RPC so `request_start_time` measures only RPC wall-clock + // — not tablet-server lookup or connection acquisition, which is matching Java's bebaviour + // Building it here also skips the allocation on the early-return error paths above. + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer, + log_scanner_status, + read_context, + remote_read_context, + remote_log_downloader, + request_start_time, + }; Self::handle_fetch_response(fetch_response, response_context).await; }); } @@ -1102,8 +1118,17 @@ impl LogFetcher { read_context, remote_read_context, remote_log_downloader, + request_start_time, } = context; + // `encoded_len()` mirrors Java's `fetchLogResponse.totalSize()`: + // both report the serialized API message body size, excluding protocol + // headers and framing. Recorded unconditionally (including zero-record + // responses) to match Java's histogram semantics. + metrics::histogram!(SCANNER_FETCH_LATENCY_MS) + .record(request_start_time.elapsed().as_secs_f64() * 1000.0); + metrics::histogram!(SCANNER_BYTES_PER_REQUEST).record(fetch_response.encoded_len() as f64); + for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; @@ -2029,6 +2054,7 @@ mod tests { read_context: fetcher.read_context.clone(), remote_read_context: fetcher.remote_read_context.clone(), remote_log_downloader: fetcher.remote_log_downloader.clone(), + request_start_time: Instant::now(), }; LogFetcher::handle_fetch_response(response, response_context).await; @@ -2082,6 +2108,7 @@ mod tests { read_context: fetcher.read_context.clone(), remote_read_context: fetcher.remote_read_context.clone(), remote_log_downloader: fetcher.remote_log_downloader.clone(), + request_start_time: Instant::now(), }; LogFetcher::handle_fetch_response(response, response_context).await; @@ -2204,4 +2231,110 @@ mod tests { } Ok(()) } + + /// Drives `handle_fetch_response` against a local metrics recorder and + /// asserts that latency + bytes-per-request histograms are emitted with + /// values that mirror what Java would record. This complements the unit + /// tests in `metrics.rs` (which only verify the facade) by exercising + /// the actual instrumented call path. + /// + /// Note: uses a `current_thread` runtime inside `with_local_recorder` + /// (rather than `#[tokio::test]`) because the metrics facade installs a + /// thread-local recorder; running the async work on the same thread is + /// the only way to observe the emitted metrics in the snapshot. Both + /// the fetcher construction and the `handle_fetch_response` call run + /// inside the runtime (the security-token manager and remote-log + /// downloader require a Tokio reactor). + #[test] + fn handle_fetch_response_emits_latency_and_bytes_metrics() { + use crate::metrics::{SCANNER_BYTES_PER_REQUEST, SCANNER_FETCH_LATENCY_MS}; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + let expected_bytes = metrics::with_local_recorder(&recorder, || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build current_thread runtime"); + + rt.block_on(async { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let status = Arc::new(LogScannerStatus::new()); + status.assign_scan_bucket(TableBucket::new(1, 0), 5); + let fetcher = LogFetcher::new( + table_info, + Arc::new(RpcClient::new()), + metadata.clone(), + status, + &Config::default(), + None, + ) + .expect("build LogFetcher"); + + let response = FetchLogResponse { + tables_resp: vec![PbFetchLogRespForTable { + table_id: 1, + buckets_resp: vec![PbFetchLogRespForBucket { + partition_id: None, + bucket_id: 0, + error_code: Some(FlussError::None.code()), + error_message: None, + high_watermark: Some(7), + log_start_offset: Some(0), + remote_log_fetch_info: None, + records: None, + }], + }], + }; + let expected_bytes = response.encoded_len() as f64; + let response_context = FetchResponseContext { + metadata: metadata.clone(), + log_fetch_buffer: fetcher.log_fetch_buffer.clone(), + log_scanner_status: fetcher.log_scanner_status.clone(), + read_context: fetcher.read_context.clone(), + remote_read_context: fetcher.remote_read_context.clone(), + remote_log_downloader: fetcher.remote_log_downloader.clone(), + request_start_time: Instant::now(), + }; + + LogFetcher::handle_fetch_response(response, response_context).await; + expected_bytes + }) + }); + + let entries: Vec<_> = snapshotter.snapshot().into_vec(); + let find_histogram = |name: &str| -> Vec { + entries + .iter() + .find_map(|(key, _, _, val)| { + if key.key().name() == name { + if let DebugValue::Histogram(v) = val { + return Some(v.iter().map(|f| f.into_inner()).collect()); + } + } + None + }) + .unwrap_or_default() + }; + + let latency_samples = find_histogram(SCANNER_FETCH_LATENCY_MS); + assert_eq!(latency_samples.len(), 1, "expected one latency sample"); + assert!( + latency_samples[0] >= 0.0, + "latency must be non-negative, got {}", + latency_samples[0] + ); + + let bytes_samples = find_histogram(SCANNER_BYTES_PER_REQUEST); + assert_eq!( + bytes_samples, + vec![expected_bytes], + "bytes histogram must record encoded_len() for parity with Java fetchLogResponse.totalSize()", + ); + } } diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index 756e2db5..a533abd5 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -49,6 +49,43 @@ pub const CLIENT_BYTES_RECEIVED_TOTAL: &str = "fluss.client.bytes_received.total pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms"; pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight"; +// --------------------------------------------------------------------------- +// Scanner fetch + remote download metrics +// +// Fetch metrics are recorded in the LogFetcher fetch loop on response +// completion. Remote metrics are recorded inside RemoteLogDownloader's +// download task. +// +// Java uses a volatile-long gauge for fetch latency and Counter+MeterView +// for rates. Rust uses a histogram for latency (richer percentile data) +// and counters for throughput; the recorder/exporter handles rate +// computation (e.g. Prometheus `rate()`). +// +// Java emits one `ScannerMetricGroup` per (database, table). Rust currently +// emits without per-table labels — adding `database`/`table` labels is +// tracked separately and intentionally deferred to keep this PR minimal. +// --------------------------------------------------------------------------- + +/// Histogram: elapsed ms for each successful FetchLog RPC. +pub const SCANNER_FETCH_LATENCY_MS: &str = "fluss.client.scanner.fetch_latency_ms"; + +/// Counter: total FetchLog RPC requests attempted after connection acquisition. +pub const SCANNER_FETCH_REQUESTS_TOTAL: &str = "fluss.client.scanner.fetch_requests.total"; + +/// Histogram: serialized bytes per successful FetchLog response. +pub const SCANNER_BYTES_PER_REQUEST: &str = "fluss.client.scanner.bytes_per_request"; + +/// Counter: total remote log download attempts (includes per-segment retries). +pub const SCANNER_REMOTE_FETCH_REQUESTS_TOTAL: &str = + "fluss.client.scanner.remote_fetch_requests.total"; + +/// Counter: total bytes downloaded from remote log storage. +pub const SCANNER_REMOTE_FETCH_BYTES_TOTAL: &str = "fluss.client.scanner.remote_fetch_bytes.total"; + +/// Counter: total remote log download failures (each retry attempt counts). +pub const SCANNER_REMOTE_FETCH_ERRORS_TOTAL: &str = + "fluss.client.scanner.remote_fetch_errors.total"; + /// Returns a label value for reportable API keys, matching Java's /// `ConnectionMetrics.REPORT_API_KEYS` filter (`ProduceLog`, `FetchLog`, /// `PutKv`, `Lookup`). Returns `None` for admin/metadata/auth calls to @@ -267,4 +304,60 @@ mod tests { assert_eq!(counter_by_api_key.get("produce_log"), Some(&5)); assert_eq!(counter_by_api_key.get("fetch_log"), Some(&3)); } + + #[test] + fn scanner_fetch_metrics_emit_correctly() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + metrics::counter!(SCANNER_FETCH_REQUESTS_TOTAL).increment(1); + metrics::histogram!(SCANNER_FETCH_LATENCY_MS).record(15.5); + metrics::histogram!(SCANNER_BYTES_PER_REQUEST).record(4096.0); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + assert_eq!( + find_counter!(entries, SCANNER_FETCH_REQUESTS_TOTAL), + Some(1) + ); + assert_eq!( + find_histogram!(entries, SCANNER_FETCH_LATENCY_MS), + Some(vec![15.5]) + ); + assert_eq!( + find_histogram!(entries, SCANNER_BYTES_PER_REQUEST), + Some(vec![4096.0]) + ); + } + + #[test] + fn scanner_remote_fetch_metrics_emit_correctly() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + metrics::counter!(SCANNER_REMOTE_FETCH_REQUESTS_TOTAL).increment(3); + metrics::counter!(SCANNER_REMOTE_FETCH_BYTES_TOTAL).increment(1024); + metrics::counter!(SCANNER_REMOTE_FETCH_ERRORS_TOTAL).increment(1); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + assert_eq!( + find_counter!(entries, SCANNER_REMOTE_FETCH_REQUESTS_TOTAL), + Some(3) + ); + assert_eq!( + find_counter!(entries, SCANNER_REMOTE_FETCH_BYTES_TOTAL), + Some(1024) + ); + assert_eq!( + find_counter!(entries, SCANNER_REMOTE_FETCH_ERRORS_TOTAL), + Some(1) + ); + } } From 4f66d59f2a3f3992b6c3b9fba3929da3fdb0e46b Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sat, 16 May 2026 18:07:07 +0200 Subject: [PATCH 2/2] add scanner fetch and remote download metrics --- crates/fluss/src/client/table/scanner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 7013ba50..0167af02 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -1078,7 +1078,7 @@ impl LogFetcher { } }; - // Build the context after the RPC so `request_start_time` measures only RPC wall-clock + // Build the context after the RPC so `request_start_time` measures only RPC wall-clock // — not tablet-server lookup or connection acquisition, which is matching Java's bebaviour // Building it here also skips the allocation on the early-return error paths above. let response_context = FetchResponseContext {