Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use crate::client::credentials::CredentialsReceiver;
use crate::error::{Error, Result};
use crate::io::{FileIO, Storage};
use crate::metadata::TableBucket;
use crate::metrics::{
SCANNER_REMOTE_FETCH_BYTES_TOTAL, SCANNER_REMOTE_FETCH_ERRORS_TOTAL,
SCANNER_REMOTE_FETCH_REQUESTS_TOTAL,
};
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
use futures::TryStreamExt;
use parking_lot::Mutex;
Expand Down Expand Up @@ -494,12 +498,19 @@ async fn spawn_download_task(
return DownloadResult::Cancelled;
}

// Java reference: RemoteLogDownloader.java increments `remoteFetchRequestCount`
// immediately before initiating the download. Each retry of the same segment
// counts as a separate request (matches Java behavior).
metrics::counter!(SCANNER_REMOTE_FETCH_REQUESTS_TOTAL).increment(1);

// Try download ONCE
let download_result = fetcher.fetch(&request).await;

match download_result {
Ok(fetch_result) => {
// Success - permit will be released on drop (FileSource handles file deletion)
metrics::counter!(SCANNER_REMOTE_FETCH_BYTES_TOTAL)
.increment(fetch_result.file_size as u64);
DownloadResult::Success {
result: RemoteLogFile {
file_path: fetch_result.file_path,
Expand All @@ -516,6 +527,8 @@ async fn spawn_download_task(
}
Err(e) => {
// Download failed - check if we should retry or give up
// Counted per attempt, so retries each contribute one error.
metrics::counter!(SCANNER_REMOTE_FETCH_ERRORS_TOTAL).increment(1);
let retry_count = request.retry_count + 1;

if retry_count > MAX_RETRY_COUNT {
Expand Down
149 changes: 141 additions & 8 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use crate::config::Config;
use crate::error::Error::UnsupportedOperation;
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
use crate::metrics::{
SCANNER_BYTES_PER_REQUEST, SCANNER_FETCH_LATENCY_MS, SCANNER_FETCH_REQUESTS_TOTAL,
};
use crate::proto::{
ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable,
};
Expand All @@ -39,6 +42,7 @@ use crate::{PartitionId, TableId};
use arrow_schema::SchemaRef;
use log::{debug, warn};
use parking_lot::{Mutex, RwLock};
use prost::Message;
use std::{
collections::{HashMap, HashSet},
slice::from_ref,
Expand Down Expand Up @@ -779,6 +783,9 @@ struct FetchResponseContext {
read_context: ReadContext,
remote_read_context: ReadContext,
remote_log_downloader: Arc<RemoteLogDownloader>,
/// `Instant` captured immediately before the FetchLog RPC; used to compute
/// `scanner.fetch_latency_ms` on a successful response.
request_start_time: Instant,
}

impl LogFetcher {
Expand Down Expand Up @@ -1021,14 +1028,6 @@ impl LogFetcher {
let remote_log_downloader = Arc::clone(&self.remote_log_downloader);
let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone();
let metadata = self.metadata.clone();
let response_context = FetchResponseContext {
metadata: metadata.clone(),
log_fetch_buffer,
log_scanner_status,
read_context,
remote_read_context,
remote_log_downloader,
};
// Spawn async task to handle the fetch request
// Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped.
// This is acceptable because:
Expand Down Expand Up @@ -1060,6 +1059,11 @@ impl LogFetcher {
}
};

// Java increment the fetch counter and capture `requestStartTime` immediately
// before the RPC. Failed connection acquisition above is not counted.
let request_start_time = Instant::now();
metrics::counter!(SCANNER_FETCH_REQUESTS_TOTAL).increment(1);

let fetch_response = match con
.request(message::FetchLogRequest::new(fetch_request.clone()))
.await
Expand All @@ -1074,6 +1078,18 @@ impl LogFetcher {
}
};

// Build the context after the RPC so `request_start_time` measures only RPC wall-clock
// — not tablet-server lookup or connection acquisition, which is matching Java's bebaviour
// Building it here also skips the allocation on the early-return error paths above.
let response_context = FetchResponseContext {
metadata: metadata.clone(),
log_fetch_buffer,
log_scanner_status,
read_context,
remote_read_context,
remote_log_downloader,
request_start_time,
};
Self::handle_fetch_response(fetch_response, response_context).await;
});
}
Expand Down Expand Up @@ -1102,8 +1118,17 @@ impl LogFetcher {
read_context,
remote_read_context,
remote_log_downloader,
request_start_time,
} = context;

// `encoded_len()` mirrors Java's `fetchLogResponse.totalSize()`:
// both report the serialized API message body size, excluding protocol
// headers and framing. Recorded unconditionally (including zero-record
// responses) to match Java's histogram semantics.
metrics::histogram!(SCANNER_FETCH_LATENCY_MS)
.record(request_start_time.elapsed().as_secs_f64() * 1000.0);
metrics::histogram!(SCANNER_BYTES_PER_REQUEST).record(fetch_response.encoded_len() as f64);

for pb_fetch_log_resp in fetch_response.tables_resp {
let table_id = pb_fetch_log_resp.table_id;
let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp;
Expand Down Expand Up @@ -2029,6 +2054,7 @@ mod tests {
read_context: fetcher.read_context.clone(),
remote_read_context: fetcher.remote_read_context.clone(),
remote_log_downloader: fetcher.remote_log_downloader.clone(),
request_start_time: Instant::now(),
};

LogFetcher::handle_fetch_response(response, response_context).await;
Expand Down Expand Up @@ -2082,6 +2108,7 @@ mod tests {
read_context: fetcher.read_context.clone(),
remote_read_context: fetcher.remote_read_context.clone(),
remote_log_downloader: fetcher.remote_log_downloader.clone(),
request_start_time: Instant::now(),
};

LogFetcher::handle_fetch_response(response, response_context).await;
Expand Down Expand Up @@ -2204,4 +2231,110 @@ mod tests {
}
Ok(())
}

/// Drives `handle_fetch_response` against a local metrics recorder and
/// asserts that latency + bytes-per-request histograms are emitted with
/// values that mirror what Java would record. This complements the unit
/// tests in `metrics.rs` (which only verify the facade) by exercising
/// the actual instrumented call path.
///
/// Note: uses a `current_thread` runtime inside `with_local_recorder`
/// (rather than `#[tokio::test]`) because the metrics facade installs a
/// thread-local recorder; running the async work on the same thread is
/// the only way to observe the emitted metrics in the snapshot. Both
/// the fetcher construction and the `handle_fetch_response` call run
/// inside the runtime (the security-token manager and remote-log
/// downloader require a Tokio reactor).
#[test]
fn handle_fetch_response_emits_latency_and_bytes_metrics() {
use crate::metrics::{SCANNER_BYTES_PER_REQUEST, SCANNER_FETCH_LATENCY_MS};
use metrics_util::debugging::{DebugValue, DebuggingRecorder};

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

let expected_bytes = metrics::with_local_recorder(&recorder, || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build current_thread runtime");

rt.block_on(async {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let table_info = build_table_info(table_path.clone(), 1, 1);
let cluster = build_cluster_arc(&table_path, 1, 1);
let metadata = Arc::new(Metadata::new_for_test(cluster));
let status = Arc::new(LogScannerStatus::new());
status.assign_scan_bucket(TableBucket::new(1, 0), 5);
let fetcher = LogFetcher::new(
table_info,
Arc::new(RpcClient::new()),
metadata.clone(),
status,
&Config::default(),
None,
)
.expect("build LogFetcher");

let response = FetchLogResponse {
tables_resp: vec![PbFetchLogRespForTable {
table_id: 1,
buckets_resp: vec![PbFetchLogRespForBucket {
partition_id: None,
bucket_id: 0,
error_code: Some(FlussError::None.code()),
error_message: None,
high_watermark: Some(7),
log_start_offset: Some(0),
remote_log_fetch_info: None,
records: None,
}],
}],
};
let expected_bytes = response.encoded_len() as f64;
let response_context = FetchResponseContext {
metadata: metadata.clone(),
log_fetch_buffer: fetcher.log_fetch_buffer.clone(),
log_scanner_status: fetcher.log_scanner_status.clone(),
read_context: fetcher.read_context.clone(),
remote_read_context: fetcher.remote_read_context.clone(),
remote_log_downloader: fetcher.remote_log_downloader.clone(),
request_start_time: Instant::now(),
};

LogFetcher::handle_fetch_response(response, response_context).await;
expected_bytes
})
});

let entries: Vec<_> = snapshotter.snapshot().into_vec();
let find_histogram = |name: &str| -> Vec<f64> {
entries
.iter()
.find_map(|(key, _, _, val)| {
if key.key().name() == name {
if let DebugValue::Histogram(v) = val {
return Some(v.iter().map(|f| f.into_inner()).collect());
}
}
None
})
.unwrap_or_default()
};

let latency_samples = find_histogram(SCANNER_FETCH_LATENCY_MS);
assert_eq!(latency_samples.len(), 1, "expected one latency sample");
assert!(
latency_samples[0] >= 0.0,
"latency must be non-negative, got {}",
latency_samples[0]
);

let bytes_samples = find_histogram(SCANNER_BYTES_PER_REQUEST);
assert_eq!(
bytes_samples,
vec![expected_bytes],
"bytes histogram must record encoded_len() for parity with Java fetchLogResponse.totalSize()",
);
}
}
93 changes: 93 additions & 0 deletions crates/fluss/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,43 @@ pub const CLIENT_BYTES_RECEIVED_TOTAL: &str = "fluss.client.bytes_received.total
pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms";
pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight";

// ---------------------------------------------------------------------------
// Scanner fetch + remote download metrics
//
// Fetch metrics are recorded in the LogFetcher fetch loop on response
// completion. Remote metrics are recorded inside RemoteLogDownloader's
// download task.
//
// Java uses a volatile-long gauge for fetch latency and Counter+MeterView
// for rates. Rust uses a histogram for latency (richer percentile data)
// and counters for throughput; the recorder/exporter handles rate
// computation (e.g. Prometheus `rate()`).
//
// Java emits one `ScannerMetricGroup` per (database, table). Rust currently
// emits without per-table labels — adding `database`/`table` labels is
// tracked separately and intentionally deferred to keep this PR minimal.
// ---------------------------------------------------------------------------

/// Histogram: elapsed ms for each successful FetchLog RPC.
pub const SCANNER_FETCH_LATENCY_MS: &str = "fluss.client.scanner.fetch_latency_ms";

/// Counter: total FetchLog RPC requests attempted after connection acquisition.
pub const SCANNER_FETCH_REQUESTS_TOTAL: &str = "fluss.client.scanner.fetch_requests.total";

/// Histogram: serialized bytes per successful FetchLog response.
pub const SCANNER_BYTES_PER_REQUEST: &str = "fluss.client.scanner.bytes_per_request";

/// Counter: total remote log download attempts (includes per-segment retries).
pub const SCANNER_REMOTE_FETCH_REQUESTS_TOTAL: &str =
"fluss.client.scanner.remote_fetch_requests.total";

/// Counter: total bytes downloaded from remote log storage.
pub const SCANNER_REMOTE_FETCH_BYTES_TOTAL: &str = "fluss.client.scanner.remote_fetch_bytes.total";

/// Counter: total remote log download failures (each retry attempt counts).
pub const SCANNER_REMOTE_FETCH_ERRORS_TOTAL: &str =
"fluss.client.scanner.remote_fetch_errors.total";

/// Returns a label value for reportable API keys, matching Java's
/// `ConnectionMetrics.REPORT_API_KEYS` filter (`ProduceLog`, `FetchLog`,
/// `PutKv`, `Lookup`). Returns `None` for admin/metadata/auth calls to
Expand Down Expand Up @@ -267,4 +304,60 @@ mod tests {
assert_eq!(counter_by_api_key.get("produce_log"), Some(&5));
assert_eq!(counter_by_api_key.get("fetch_log"), Some(&3));
}

#[test]
fn scanner_fetch_metrics_emit_correctly() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
metrics::counter!(SCANNER_FETCH_REQUESTS_TOTAL).increment(1);
metrics::histogram!(SCANNER_FETCH_LATENCY_MS).record(15.5);
metrics::histogram!(SCANNER_BYTES_PER_REQUEST).record(4096.0);
});

let snapshot = snapshotter.snapshot();
let entries: Vec<_> = snapshot.into_vec();

assert_eq!(
find_counter!(entries, SCANNER_FETCH_REQUESTS_TOTAL),
Some(1)
);
assert_eq!(
find_histogram!(entries, SCANNER_FETCH_LATENCY_MS),
Some(vec![15.5])
);
assert_eq!(
find_histogram!(entries, SCANNER_BYTES_PER_REQUEST),
Some(vec![4096.0])
);
}

#[test]
fn scanner_remote_fetch_metrics_emit_correctly() {
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
metrics::counter!(SCANNER_REMOTE_FETCH_REQUESTS_TOTAL).increment(3);
metrics::counter!(SCANNER_REMOTE_FETCH_BYTES_TOTAL).increment(1024);
metrics::counter!(SCANNER_REMOTE_FETCH_ERRORS_TOTAL).increment(1);
});

let snapshot = snapshotter.snapshot();
let entries: Vec<_> = snapshot.into_vec();

assert_eq!(
find_counter!(entries, SCANNER_REMOTE_FETCH_REQUESTS_TOTAL),
Some(3)
);
assert_eq!(
find_counter!(entries, SCANNER_REMOTE_FETCH_BYTES_TOTAL),
Some(1024)
);
assert_eq!(
find_counter!(entries, SCANNER_REMOTE_FETCH_ERRORS_TOTAL),
Some(1)
);
}
}
Loading