From ce87a7b35d5f01b614fedceffe4cb37ad5a4e208 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Tue, 19 May 2026 18:00:26 +0800 Subject: [PATCH] [flink] support offset lag --- .../apache/fluss/config/ConfigOptions.java | 8 + .../metrics/FlinkSourceReaderMetrics.java | 195 ++++++++++++++++-- .../source/reader/FlinkSourceSplitReader.java | 41 +++- .../source/reader/LogEndOffsetRefresher.java | 175 ++++++++++++++++ .../metrics/FlinkSourceReaderMetricsTest.java | 95 ++++++++- 5 files changed, 495 insertions(+), 19 deletions(-) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogEndOffsetRefresher.java diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 1818e16ef3..47be573937 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1276,6 +1276,14 @@ public class ConfigOptions { "The maximum time to wait for enough bytes to be available for a fetch log " + "request from client to response."); + public static final ConfigOption CLIENT_SCANNER_LOG_END_OFFSET_REFRESH_INTERVAL = + key("client.scanner.log.end-offset-refresh-interval") + .durationType() + .defaultValue(Duration.ofSeconds(10)) + .withDescription( + "The interval for Flink source readers to refresh latest log end offsets " + + "of subscribed buckets for records lag metrics."); + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MIN_BYTES = key("client.scanner.log.fetch.min-bytes") .memoryType() diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java index dc98ada820..80c6ac8519 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.source.metrics; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.flink.source.reader.FlinkSourceReader; import org.apache.fluss.metadata.TableBucket; @@ -26,8 +27,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * A collection class for handling metrics in {@link FlinkSourceReader} of Fluss. @@ -52,6 +57,11 @@ public class FlinkSourceReaderMetrics { public static final String PARTITION_GROUP = "partition"; public static final String BUCKET_GROUP = "bucket"; public static final String CURRENT_OFFSET_METRIC_GAUGE = "currentOffset"; + public static final String LOG_END_OFFSET_METRIC_GAUGE = "logEndOffset"; + public static final String RECORDS_LAG_METRIC_GAUGE = "recordsLag"; + public static final String RECORDS_LAG_MAX_METRIC_GAUGE = "recordsLagMax"; + public static final String RECORDS_LAG_SUM_METRIC_GAUGE = "recordsLagSum"; + public static final String LAGGING_BUCKETS_METRIC_GAUGE = "laggingBuckets"; public static final long INITIAL_OFFSET = -1; public static final long UNINITIALIZED = -1; @@ -62,8 +72,8 @@ public class FlinkSourceReaderMetrics { // Metric group for registering Fluss specific reader metrics private final MetricGroup flussSourceReaderMetricGroup; - // Map for tracking current consuming offsets - private final Map offsets = new HashMap<>(); + // Map for tracking current consuming offsets and lag state. + private final Map bucketMetrics = new ConcurrentHashMap<>(); // For currentFetchEventTimeLag metric private volatile long currentFetchEventTimeLag = UNINITIALIZED; @@ -72,6 +82,7 @@ public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup) this.sourceReaderMetricGroup = sourceReaderMetricGroup; this.flussSourceReaderMetricGroup = sourceReaderMetricGroup.addGroup(FLUSS_METRIC_GROUP).addGroup(READER_METRIC_GROUP); + registerRecordsLagMetrics(); } public void reportRecordEventTime(long lag) { @@ -87,9 +98,29 @@ public void reportRecordEventTime(long lag) { currentFetchEventTimeLag = lag; } - public void registerTableBucket(TableBucket tableBucket) { - offsets.put(tableBucket, INITIAL_OFFSET); - registerOffsetMetricsForTableBucket(tableBucket); + @VisibleForTesting + protected void registerTableBucket(TableBucket tableBucket) { + registerTableBucket(tableBucket, null, INITIAL_OFFSET, null, true); + } + + public void registerTableBucket( + TableBucket tableBucket, + @Nullable String partitionName, + long startingOffset, + @Nullable Long stoppingOffset, + boolean refreshLogEndOffset) { + bucketMetrics.computeIfAbsent( + tableBucket, + key -> { + BucketMetrics newMetrics = + new BucketMetrics( + partitionName, + startingOffset, + stoppingOffset, + refreshLogEndOffset); + registerOffsetMetricsForTableBucket(key, newMetrics); + return newMetrics; + }); } /** @@ -99,12 +130,46 @@ public void registerTableBucket(TableBucket tableBucket) { * @param offset Current consuming offset */ public void recordCurrentOffset(TableBucket tb, long offset) { - checkTableBucketTracked(tb); - offsets.put(tb, offset); + BucketMetrics metrics = bucketMetrics.get(tb); + if (metrics != null) { + metrics.recordCurrentOffset(offset); + } + } + + public void recordLogEndOffset(TableBucket tb, long logEndOffset) { + BucketMetrics metrics = bucketMetrics.get(tb); + if (metrics != null) { + metrics.updateLogEndOffsetIfAdvanced(logEndOffset); + } + } + + public void unregisterTableBucket(TableBucket tb) { + BucketMetrics metrics = bucketMetrics.get(tb); + if (metrics != null) { + metrics.deactivate(); + } + } + + public List subscribedBucketsForLogEndOffsetRefresh() { + List subscribedBuckets = new ArrayList<>(); + for (Map.Entry entry : bucketMetrics.entrySet()) { + BucketMetrics metrics = entry.getValue(); + if (metrics.needsLogEndOffsetRefresh()) { + subscribedBuckets.add(metrics.toSubscribedBucket(entry.getKey())); + } + } + return subscribedBuckets; } // -------- Helper functions -------- - private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) { + private void registerRecordsLagMetrics() { + flussSourceReaderMetricGroup.gauge(RECORDS_LAG_MAX_METRIC_GAUGE, this::recordsLagMax); + flussSourceReaderMetricGroup.gauge(RECORDS_LAG_SUM_METRIC_GAUGE, this::recordsLagSum); + flussSourceReaderMetricGroup.gauge(LAGGING_BUCKETS_METRIC_GAUGE, this::laggingBuckets); + } + + private void registerOffsetMetricsForTableBucket( + TableBucket tableBucket, BucketMetrics bucketMetrics) { final MetricGroup metricGroup = tableBucket.getPartitionId() == null ? this.flussSourceReaderMetricGroup @@ -112,18 +177,118 @@ private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) { PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId())); final MetricGroup bucketGroup = metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket())); - bucketGroup.gauge( - CURRENT_OFFSET_METRIC_GAUGE, - () -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET)); + bucketGroup.gauge(CURRENT_OFFSET_METRIC_GAUGE, () -> bucketMetrics.currentOffset()); + bucketGroup.gauge(LOG_END_OFFSET_METRIC_GAUGE, () -> bucketMetrics.logEndOffset()); + bucketGroup.gauge(RECORDS_LAG_METRIC_GAUGE, () -> bucketMetrics.recordsLag()); + } + + private long recordsLagMax() { + long maxLag = 0L; + for (BucketMetrics metrics : bucketMetrics.values()) { + maxLag = Math.max(maxLag, metrics.recordsLag()); + } + return maxLag; + } + + private long recordsLagSum() { + long lagSum = 0L; + for (BucketMetrics metrics : bucketMetrics.values()) { + lagSum += metrics.recordsLag(); + } + return lagSum; } - private void checkTableBucketTracked(TableBucket tableBucket) { - if (!offsets.containsKey(tableBucket)) { - LOG.warn("Offset metrics of TableBucket {} is not tracked", tableBucket); + private int laggingBuckets() { + int laggingBuckets = 0; + for (BucketMetrics metrics : bucketMetrics.values()) { + if (metrics.recordsLag() > 0) { + laggingBuckets++; + } } + return laggingBuckets; } public SourceReaderMetricGroup getSourceReaderMetricGroup() { return sourceReaderMetricGroup; } + + private static class BucketMetrics { + @Nullable private volatile String partitionName; + private volatile boolean active; + private volatile long currentOffset; + private volatile long nextConsumedOffset; + private volatile long logEndOffset; + private volatile boolean refreshLogEndOffset; + + private BucketMetrics( + @Nullable String partitionName, + long startingOffset, + @Nullable Long stoppingOffset, + boolean refreshLogEndOffset) { + this.partitionName = partitionName; + this.currentOffset = INITIAL_OFFSET; + this.nextConsumedOffset = startingOffset; + this.logEndOffset = stoppingOffset == null ? UNINITIALIZED : stoppingOffset; + this.refreshLogEndOffset = refreshLogEndOffset && stoppingOffset == null; + this.active = true; + } + + private void recordCurrentOffset(long offset) { + currentOffset = offset; + nextConsumedOffset = offset + 1; + } + + private void updateLogEndOffsetIfAdvanced(long logEndOffset) { + if (active && refreshLogEndOffset && logEndOffset > this.logEndOffset) { + this.logEndOffset = logEndOffset; + } + } + + private void deactivate() { + active = false; + } + + private boolean needsLogEndOffsetRefresh() { + return active && refreshLogEndOffset; + } + + private SubscribedBucket toSubscribedBucket(TableBucket tableBucket) { + return new SubscribedBucket(tableBucket, partitionName); + } + + private long currentOffset() { + return currentOffset; + } + + private long logEndOffset() { + return logEndOffset; + } + + private long recordsLag() { + if (!active || logEndOffset < 0 || nextConsumedOffset < 0) { + return 0L; + } + return Math.max(0L, logEndOffset - nextConsumedOffset); + } + } + + /** The subscribed bucket whose latest log end offset needs to be refreshed. */ + public static class SubscribedBucket { + private final TableBucket tableBucket; + @Nullable private final String partitionName; + + private SubscribedBucket(TableBucket tableBucket, @Nullable String partitionName) { + this.tableBucket = tableBucket; + this.partitionName = partitionName; + } + + public TableBucket tableBucket() { + return tableBucket; + } + + @Nullable + public String partitionName() { + return partitionName; + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index 90f2b6e9a6..9fb723649e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -70,6 +70,7 @@ import java.util.Queue; import java.util.Set; +import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_LOG_END_OFFSET_REFRESH_INTERVAL; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -99,6 +100,7 @@ public class FlinkSourceSplitReader implements SplitReader lakeSource; + private final Configuration flussConf; + private final TablePath tablePath; private final Long tableId; private final Map stoppingOffsets; @@ -127,6 +131,8 @@ public FlinkSourceSplitReader( FlinkSourceReaderMetrics flinkSourceReaderMetrics) { this.flinkMetricRegistry = new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup()); + this.flussConf = flussConf; + this.tablePath = tablePath; this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry); this.table = connection.getTable(tablePath); this.tableId = table.getTableInfo().getTableId(); @@ -313,11 +319,39 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) { split.splitId(), startingOffset); // Track the new bucket in metrics and internal state. - flinkSourceReaderMetrics.registerTableBucket(tableBucket); + Optional stoppingOffset = getStoppingOffsetForMetrics(split); + flinkSourceReaderMetrics.registerTableBucket( + tableBucket, + split.getPartitionName(), + startingOffset, + stoppingOffset.orElse(null), + !stoppingOffset.isPresent()); + if (!stoppingOffset.isPresent()) { + ensureLogEndOffsetRefresherStarted(); + } subscribedBuckets.put(tableBucket, split.splitId()); } } + private void ensureLogEndOffsetRefresherStarted() { + if (logEndOffsetRefresher == null) { + logEndOffsetRefresher = + new LogEndOffsetRefresher( + connection.getAdmin(), + tablePath, + flinkSourceReaderMetrics, + flussConf.get(CLIENT_SCANNER_LOG_END_OFFSET_REFRESH_INTERVAL)); + logEndOffsetRefresher.start(); + } + } + + private Optional getStoppingOffsetForMetrics(SourceSplitBase split) { + if (split instanceof LogSplit) { + return ((LogSplit) split).getStoppingOffset(); + } + return Optional.empty(); + } + public Set removePartitions(Map removedPartitions) { // First, if the current active bounded split belongs to a removed partition and is not // LakeSnapshotSplit, finish it so it will not be restored. @@ -378,6 +412,7 @@ public Set removePartitions(Map removedPartitions) { tableBucket.getBucket()); removedSplits.add(tableBucketAndSplit.getValue()); subscribeTableBucketIterator.remove(); + flinkSourceReaderMetrics.unregisterTableBucket(tableBucket); unsubscribedTableBuckets.add(tableBucket); LOG.info( "Unsubscribe to read log of split {} for non-existed partition {}.", @@ -445,6 +480,7 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) { List bucketScanRecords = scanRecords.records(scanBucket); if (!bucketScanRecords.isEmpty()) { final ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); + flinkSourceReaderMetrics.recordLogEndOffset(scanBucket, lastRecord.logOffset() + 1); // We keep the maximum message timestamp in the fetch for calculating lags maxConsumerRecordTimestampInFetch = Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp()); @@ -568,6 +604,9 @@ public void close() throws Exception { if (currentBoundedSplitReader != null) { currentBoundedSplitReader.close(); } + if (logEndOffsetRefresher != null) { + logEndOffsetRefresher.close(); + } if (logScanner != null) { logScanner.close(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogEndOffsetRefresher.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogEndOffsetRefresher.java new file mode 100644 index 0000000000..c82e509071 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogEndOffsetRefresher.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.reader; + +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.admin.ListOffsetsResult; +import org.apache.fluss.client.admin.OffsetSpec; +import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics; +import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics.SubscribedBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.utils.ExecutorUtils; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Periodically refreshes latest log end offsets for subscribed source reader buckets. */ +class LogEndOffsetRefresher implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(LogEndOffsetRefresher.class); + + private final Admin admin; + private final TablePath tablePath; + private final FlinkSourceReaderMetrics metrics; + private final ScheduledExecutorService executor; + private final AtomicBoolean refreshInFlight = new AtomicBoolean(false); + private final long refreshIntervalMillis; + + private volatile boolean closed; + + LogEndOffsetRefresher( + Admin admin, + TablePath tablePath, + FlinkSourceReaderMetrics metrics, + Duration refreshInterval) { + this.admin = admin; + this.tablePath = tablePath; + this.metrics = metrics; + this.refreshIntervalMillis = refreshInterval.toMillis(); + this.executor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("fluss-log-end-offset-refresher")); + } + + void start() { + if (refreshIntervalMillis <= 0) { + LOG.info("Disable log end offset refreshing because refresh interval is non-positive."); + return; + } + executor.scheduleWithFixedDelay( + this::refresh, 0L, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void refresh() { + if (closed || !refreshInFlight.compareAndSet(false, true)) { + return; + } + + List subscribedBuckets = + metrics.subscribedBucketsForLogEndOffsetRefresh(); + if (subscribedBuckets.isEmpty()) { + refreshInFlight.set(false); + return; + } + + Map> bucketsByPartitionName = + groupByPartitionName(subscribedBuckets); + List> futures = new ArrayList<>(bucketsByPartitionName.size()); + for (Map.Entry> entry : bucketsByPartitionName.entrySet()) { + futures.add(refreshPartition(entry.getKey(), entry.getValue())); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .whenComplete( + (ignored, throwable) -> { + if (throwable != null) { + LOG.warn( + "Failed to refresh log end offsets for subscribed buckets.", + throwable); + } + refreshInFlight.set(false); + }); + } + + private Map> groupByPartitionName( + List subscribedBuckets) { + Map> bucketsByPartitionName = new HashMap<>(); + for (SubscribedBucket subscribedBucket : subscribedBuckets) { + String partitionName = subscribedBucket.partitionName(); + List buckets = bucketsByPartitionName.get(partitionName); + if (buckets == null) { + buckets = new ArrayList<>(); + bucketsByPartitionName.put(partitionName, buckets); + } + buckets.add(subscribedBucket); + } + return bucketsByPartitionName; + } + + private CompletableFuture refreshPartition( + String partitionName, List subscribedBuckets) { + try { + Collection buckets = new ArrayList<>(subscribedBuckets.size()); + for (SubscribedBucket subscribedBucket : subscribedBuckets) { + buckets.add(subscribedBucket.tableBucket().getBucket()); + } + + ListOffsetsResult result = + partitionName == null + ? admin.listOffsets(tablePath, buckets, new OffsetSpec.LatestSpec()) + : admin.listOffsets( + tablePath, partitionName, buckets, new OffsetSpec.LatestSpec()); + return result.all() + .thenAccept( + offsets -> { + for (SubscribedBucket subscribedBucket : subscribedBuckets) { + TableBucket tableBucket = subscribedBucket.tableBucket(); + Long logEndOffset = offsets.get(tableBucket.getBucket()); + if (logEndOffset != null) { + metrics.recordLogEndOffset(tableBucket, logEndOffset); + } + } + }) + .exceptionally( + throwable -> { + LOG.debug( + "Failed to refresh log end offsets for buckets {}.", + buckets, + throwable); + return null; + }); + } catch (Throwable t) { + LOG.debug("Failed to send list offsets request for buckets {}.", subscribedBuckets, t); + return CompletableFuture.completedFuture(null); + } + } + + @Override + public void close() { + try { + closed = true; + ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, executor); + admin.close(); + } catch (Exception e) { + LOG.warn("Failed to close admin for log end offset refresher.", e); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java index 255a5b6808..8cc888878c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java @@ -64,10 +64,81 @@ void testCurrentOffsetTracking() { assertCurrentOffset(t3, 15513L, metricListener); } + @Test + void testRecordsLagTracking() { + MetricListener metricListener = new MetricListener(); + final TableBucket t0 = new TableBucket(0, 0L, 1); + final TableBucket t1 = new TableBucket(0, null, 2); + + final FlinkSourceReaderMetrics flinkSourceReaderMetrics = + new FlinkSourceReaderMetrics( + InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup())); + + flinkSourceReaderMetrics.registerTableBucket(t0, "dt=2026-05-19", 100L, null, true); + flinkSourceReaderMetrics.recordLogEndOffset(t0, 130L); + flinkSourceReaderMetrics.recordLogEndOffset(t0, 135L); + flinkSourceReaderMetrics.recordLogEndOffset(t0, 120L); + + flinkSourceReaderMetrics.registerTableBucket(t1, null, 5L, 8L, false); + flinkSourceReaderMetrics.recordLogEndOffset(t1, 99L); + + assertRecordsLag(t0, 35L, metricListener); + assertLogEndOffset(t0, 135L, metricListener); + assertRecordsLag(t1, 3L, metricListener); + assertLogEndOffset(t1, 8L, metricListener); + assertSourceGauge( + FlinkSourceReaderMetrics.RECORDS_LAG_MAX_METRIC_GAUGE, 35L, metricListener); + assertSourceGauge( + FlinkSourceReaderMetrics.RECORDS_LAG_SUM_METRIC_GAUGE, 38L, metricListener); + assertLaggingBuckets(2, metricListener); + + flinkSourceReaderMetrics.recordCurrentOffset(t0, 109L); + flinkSourceReaderMetrics.recordCurrentOffset(t1, 7L); + + assertRecordsLag(t0, 25L, metricListener); + assertRecordsLag(t1, 0L, metricListener); + assertSourceGauge( + FlinkSourceReaderMetrics.RECORDS_LAG_MAX_METRIC_GAUGE, 25L, metricListener); + assertSourceGauge( + FlinkSourceReaderMetrics.RECORDS_LAG_SUM_METRIC_GAUGE, 25L, metricListener); + assertLaggingBuckets(1, metricListener); + + flinkSourceReaderMetrics.unregisterTableBucket(t0); + + assertSourceGauge( + FlinkSourceReaderMetrics.RECORDS_LAG_MAX_METRIC_GAUGE, 0L, metricListener); + assertSourceGauge( + FlinkSourceReaderMetrics.RECORDS_LAG_SUM_METRIC_GAUGE, 0L, metricListener); + assertLaggingBuckets(0, metricListener); + } + // ----------- Assertions -------------- private void assertCurrentOffset( TableBucket tb, long expectedOffset, MetricListener metricListener) { + assertBucketGauge( + tb, + FlinkSourceReaderMetrics.CURRENT_OFFSET_METRIC_GAUGE, + expectedOffset, + metricListener); + } + + private void assertLogEndOffset( + TableBucket tb, long expectedOffset, MetricListener metricListener) { + assertBucketGauge( + tb, + FlinkSourceReaderMetrics.LOG_END_OFFSET_METRIC_GAUGE, + expectedOffset, + metricListener); + } + + private void assertRecordsLag(TableBucket tb, long expectedLag, MetricListener metricListener) { + assertBucketGauge( + tb, FlinkSourceReaderMetrics.RECORDS_LAG_METRIC_GAUGE, expectedLag, metricListener); + } + + private void assertBucketGauge( + TableBucket tb, String metricName, long expectedValue, MetricListener metricListener) { final Optional> currentOffsetGauge; if (tb.getPartitionId() == null) { currentOffsetGauge = @@ -76,7 +147,7 @@ private void assertCurrentOffset( READER_METRIC_GROUP, BUCKET_GROUP, String.valueOf(tb.getBucket()), - FlinkSourceReaderMetrics.CURRENT_OFFSET_METRIC_GAUGE); + metricName); } else { currentOffsetGauge = metricListener.getGauge( @@ -86,10 +157,28 @@ private void assertCurrentOffset( String.valueOf(tb.getPartitionId()), BUCKET_GROUP, String.valueOf(tb.getBucket()), - FlinkSourceReaderMetrics.CURRENT_OFFSET_METRIC_GAUGE); + metricName); } assertThat(currentOffsetGauge).isPresent(); - assertThat((long) currentOffsetGauge.get().getValue()).isEqualTo(expectedOffset); + assertThat((long) currentOffsetGauge.get().getValue()).isEqualTo(expectedValue); + } + + private void assertSourceGauge( + String metricName, long expectedValue, MetricListener metricListener) { + Optional> gauge = + metricListener.getGauge(FLUSS_METRIC_GROUP, READER_METRIC_GROUP, metricName); + assertThat(gauge).isPresent(); + assertThat((long) gauge.get().getValue()).isEqualTo(expectedValue); + } + + private void assertLaggingBuckets(int expectedValue, MetricListener metricListener) { + Optional> gauge = + metricListener.getGauge( + FLUSS_METRIC_GROUP, + READER_METRIC_GROUP, + FlinkSourceReaderMetrics.LAGGING_BUCKETS_METRIC_GAUGE); + assertThat(gauge).isPresent(); + assertThat((int) gauge.get().getValue()).isEqualTo(expectedValue); } }