Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,14 @@ public class ConfigOptions {
"The maximum time to wait for enough bytes to be available for a fetch log "
+ "request from client to response.");

public static final ConfigOption<Duration> CLIENT_SCANNER_LOG_END_OFFSET_REFRESH_INTERVAL =
key("client.scanner.log.end-offset-refresh-interval")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription(
"The interval for Flink source readers to refresh latest log end offsets "
+ "of subscribed buckets for records lag metrics.");

public static final ConfigOption<MemorySize> CLIENT_SCANNER_LOG_FETCH_MIN_BYTES =
key("client.scanner.log.fetch.min-bytes")
.memoryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.flink.source.metrics;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.flink.source.reader.FlinkSourceReader;
import org.apache.fluss.metadata.TableBucket;

Expand All @@ -26,8 +27,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* A collection class for handling metrics in {@link FlinkSourceReader} of Fluss.
Expand All @@ -52,6 +57,11 @@ public class FlinkSourceReaderMetrics {
public static final String PARTITION_GROUP = "partition";
public static final String BUCKET_GROUP = "bucket";
public static final String CURRENT_OFFSET_METRIC_GAUGE = "currentOffset";
public static final String LOG_END_OFFSET_METRIC_GAUGE = "logEndOffset";
public static final String RECORDS_LAG_METRIC_GAUGE = "recordsLag";
public static final String RECORDS_LAG_MAX_METRIC_GAUGE = "recordsLagMax";
public static final String RECORDS_LAG_SUM_METRIC_GAUGE = "recordsLagSum";
public static final String LAGGING_BUCKETS_METRIC_GAUGE = "laggingBuckets";

public static final long INITIAL_OFFSET = -1;
public static final long UNINITIALIZED = -1;
Expand All @@ -62,8 +72,8 @@ public class FlinkSourceReaderMetrics {
// Metric group for registering Fluss specific reader metrics
private final MetricGroup flussSourceReaderMetricGroup;

// Map for tracking current consuming offsets
private final Map<TableBucket, Long> offsets = new HashMap<>();
// Map for tracking current consuming offsets and lag state.
private final Map<TableBucket, BucketMetrics> bucketMetrics = new ConcurrentHashMap<>();

// For currentFetchEventTimeLag metric
private volatile long currentFetchEventTimeLag = UNINITIALIZED;
Expand All @@ -72,6 +82,7 @@ public FlinkSourceReaderMetrics(SourceReaderMetricGroup sourceReaderMetricGroup)
this.sourceReaderMetricGroup = sourceReaderMetricGroup;
this.flussSourceReaderMetricGroup =
sourceReaderMetricGroup.addGroup(FLUSS_METRIC_GROUP).addGroup(READER_METRIC_GROUP);
registerRecordsLagMetrics();
}

public void reportRecordEventTime(long lag) {
Expand All @@ -87,9 +98,29 @@ public void reportRecordEventTime(long lag) {
currentFetchEventTimeLag = lag;
}

public void registerTableBucket(TableBucket tableBucket) {
offsets.put(tableBucket, INITIAL_OFFSET);
registerOffsetMetricsForTableBucket(tableBucket);
@VisibleForTesting
protected void registerTableBucket(TableBucket tableBucket) {
registerTableBucket(tableBucket, null, INITIAL_OFFSET, null, true);
}

public void registerTableBucket(
TableBucket tableBucket,
@Nullable String partitionName,
long startingOffset,
@Nullable Long stoppingOffset,
boolean refreshLogEndOffset) {
bucketMetrics.computeIfAbsent(
tableBucket,
key -> {
BucketMetrics newMetrics =
new BucketMetrics(
partitionName,
startingOffset,
stoppingOffset,
refreshLogEndOffset);
registerOffsetMetricsForTableBucket(key, newMetrics);
return newMetrics;
});
}

/**
Expand All @@ -99,31 +130,165 @@ public void registerTableBucket(TableBucket tableBucket) {
* @param offset Current consuming offset
*/
public void recordCurrentOffset(TableBucket tb, long offset) {
checkTableBucketTracked(tb);
offsets.put(tb, offset);
BucketMetrics metrics = bucketMetrics.get(tb);
if (metrics != null) {
metrics.recordCurrentOffset(offset);
}
}

public void recordLogEndOffset(TableBucket tb, long logEndOffset) {
BucketMetrics metrics = bucketMetrics.get(tb);
if (metrics != null) {
metrics.updateLogEndOffsetIfAdvanced(logEndOffset);
}
}

public void unregisterTableBucket(TableBucket tb) {
BucketMetrics metrics = bucketMetrics.get(tb);
if (metrics != null) {
metrics.deactivate();
}
}

public List<SubscribedBucket> subscribedBucketsForLogEndOffsetRefresh() {
List<SubscribedBucket> subscribedBuckets = new ArrayList<>();
for (Map.Entry<TableBucket, BucketMetrics> entry : bucketMetrics.entrySet()) {
BucketMetrics metrics = entry.getValue();
if (metrics.needsLogEndOffsetRefresh()) {
subscribedBuckets.add(metrics.toSubscribedBucket(entry.getKey()));
}
}
return subscribedBuckets;
}

// -------- Helper functions --------
private void registerOffsetMetricsForTableBucket(TableBucket tableBucket) {
private void registerRecordsLagMetrics() {
flussSourceReaderMetricGroup.gauge(RECORDS_LAG_MAX_METRIC_GAUGE, this::recordsLagMax);
flussSourceReaderMetricGroup.gauge(RECORDS_LAG_SUM_METRIC_GAUGE, this::recordsLagSum);
flussSourceReaderMetricGroup.gauge(LAGGING_BUCKETS_METRIC_GAUGE, this::laggingBuckets);
}

private void registerOffsetMetricsForTableBucket(
TableBucket tableBucket, BucketMetrics bucketMetrics) {
final MetricGroup metricGroup =
tableBucket.getPartitionId() == null
? this.flussSourceReaderMetricGroup
: this.flussSourceReaderMetricGroup.addGroup(
PARTITION_GROUP, String.valueOf(tableBucket.getPartitionId()));
final MetricGroup bucketGroup =
metricGroup.addGroup(BUCKET_GROUP, String.valueOf(tableBucket.getBucket()));
bucketGroup.gauge(
CURRENT_OFFSET_METRIC_GAUGE,
() -> offsets.getOrDefault(tableBucket, INITIAL_OFFSET));
bucketGroup.gauge(CURRENT_OFFSET_METRIC_GAUGE, () -> bucketMetrics.currentOffset());
bucketGroup.gauge(LOG_END_OFFSET_METRIC_GAUGE, () -> bucketMetrics.logEndOffset());
bucketGroup.gauge(RECORDS_LAG_METRIC_GAUGE, () -> bucketMetrics.recordsLag());
}

private long recordsLagMax() {
long maxLag = 0L;
for (BucketMetrics metrics : bucketMetrics.values()) {
maxLag = Math.max(maxLag, metrics.recordsLag());
}
return maxLag;
}

private long recordsLagSum() {
long lagSum = 0L;
for (BucketMetrics metrics : bucketMetrics.values()) {
lagSum += metrics.recordsLag();
}
return lagSum;
}

private void checkTableBucketTracked(TableBucket tableBucket) {
if (!offsets.containsKey(tableBucket)) {
LOG.warn("Offset metrics of TableBucket {} is not tracked", tableBucket);
private int laggingBuckets() {
int laggingBuckets = 0;
for (BucketMetrics metrics : bucketMetrics.values()) {
if (metrics.recordsLag() > 0) {
laggingBuckets++;
}
}
return laggingBuckets;
}

public SourceReaderMetricGroup getSourceReaderMetricGroup() {
return sourceReaderMetricGroup;
}

private static class BucketMetrics {
@Nullable private volatile String partitionName;
private volatile boolean active;
private volatile long currentOffset;
private volatile long nextConsumedOffset;
private volatile long logEndOffset;
private volatile boolean refreshLogEndOffset;

private BucketMetrics(
@Nullable String partitionName,
long startingOffset,
@Nullable Long stoppingOffset,
boolean refreshLogEndOffset) {
this.partitionName = partitionName;
this.currentOffset = INITIAL_OFFSET;
this.nextConsumedOffset = startingOffset;
this.logEndOffset = stoppingOffset == null ? UNINITIALIZED : stoppingOffset;
this.refreshLogEndOffset = refreshLogEndOffset && stoppingOffset == null;
this.active = true;
}

private void recordCurrentOffset(long offset) {
currentOffset = offset;
nextConsumedOffset = offset + 1;
}

private void updateLogEndOffsetIfAdvanced(long logEndOffset) {
if (active && refreshLogEndOffset && logEndOffset > this.logEndOffset) {
this.logEndOffset = logEndOffset;
}
}

private void deactivate() {
active = false;
}

private boolean needsLogEndOffsetRefresh() {
return active && refreshLogEndOffset;
}

private SubscribedBucket toSubscribedBucket(TableBucket tableBucket) {
return new SubscribedBucket(tableBucket, partitionName);
}

private long currentOffset() {
return currentOffset;
}

private long logEndOffset() {
return logEndOffset;
}

private long recordsLag() {
if (!active || logEndOffset < 0 || nextConsumedOffset < 0) {
return 0L;
}
return Math.max(0L, logEndOffset - nextConsumedOffset);
}
}

/** The subscribed bucket whose latest log end offset needs to be refreshed. */
public static class SubscribedBucket {
private final TableBucket tableBucket;
@Nullable private final String partitionName;

private SubscribedBucket(TableBucket tableBucket, @Nullable String partitionName) {
this.tableBucket = tableBucket;
this.partitionName = partitionName;
}

public TableBucket tableBucket() {
return tableBucket;
}

@Nullable
public String partitionName() {
return partitionName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.Queue;
import java.util.Set;

import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_LOG_END_OFFSET_REFRESH_INTERVAL;
import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

Expand Down Expand Up @@ -99,13 +100,16 @@ public class FlinkSourceSplitReader implements SplitReader<RecordAndPos, SourceS
@Nullable private SourceSplitBase currentBoundedSplit;

private final LogScanner logScanner;
@Nullable private LogEndOffsetRefresher logEndOffsetRefresher;

private final Connection connection;
private final Table table;
private final FlinkMetricRegistry flinkMetricRegistry;

@Nullable private final LakeSource<LakeSplit> lakeSource;

private final Configuration flussConf;
private final TablePath tablePath;
private final Long tableId;

private final Map<TableBucket, Long> stoppingOffsets;
Expand All @@ -127,6 +131,8 @@ public FlinkSourceSplitReader(
FlinkSourceReaderMetrics flinkSourceReaderMetrics) {
this.flinkMetricRegistry =
new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
this.flussConf = flussConf;
this.tablePath = tablePath;
this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry);
this.table = connection.getTable(tablePath);
this.tableId = table.getTableInfo().getTableId();
Expand Down Expand Up @@ -313,11 +319,39 @@ private void subscribeLog(SourceSplitBase split, long startingOffset) {
split.splitId(),
startingOffset);
// Track the new bucket in metrics and internal state.
flinkSourceReaderMetrics.registerTableBucket(tableBucket);
Optional<Long> stoppingOffset = getStoppingOffsetForMetrics(split);
flinkSourceReaderMetrics.registerTableBucket(
tableBucket,
split.getPartitionName(),
startingOffset,
stoppingOffset.orElse(null),
!stoppingOffset.isPresent());
if (!stoppingOffset.isPresent()) {
ensureLogEndOffsetRefresherStarted();
}
subscribedBuckets.put(tableBucket, split.splitId());
}
}

private void ensureLogEndOffsetRefresherStarted() {
if (logEndOffsetRefresher == null) {
logEndOffsetRefresher =
new LogEndOffsetRefresher(
connection.getAdmin(),
tablePath,
flinkSourceReaderMetrics,
flussConf.get(CLIENT_SCANNER_LOG_END_OFFSET_REFRESH_INTERVAL));
logEndOffsetRefresher.start();
}
}

private Optional<Long> getStoppingOffsetForMetrics(SourceSplitBase split) {
if (split instanceof LogSplit) {
return ((LogSplit) split).getStoppingOffset();
}
return Optional.empty();
}

public Set<TableBucket> removePartitions(Map<Long, String> removedPartitions) {
// First, if the current active bounded split belongs to a removed partition and is not
// LakeSnapshotSplit, finish it so it will not be restored.
Expand Down Expand Up @@ -378,6 +412,7 @@ public Set<TableBucket> removePartitions(Map<Long, String> removedPartitions) {
tableBucket.getBucket());
removedSplits.add(tableBucketAndSplit.getValue());
subscribeTableBucketIterator.remove();
flinkSourceReaderMetrics.unregisterTableBucket(tableBucket);
unsubscribedTableBuckets.add(tableBucket);
LOG.info(
"Unsubscribe to read log of split {} for non-existed partition {}.",
Expand Down Expand Up @@ -445,6 +480,7 @@ private FlinkRecordsWithSplitIds forLogRecords(ScanRecords scanRecords) {
List<ScanRecord> bucketScanRecords = scanRecords.records(scanBucket);
if (!bucketScanRecords.isEmpty()) {
final ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1);
flinkSourceReaderMetrics.recordLogEndOffset(scanBucket, lastRecord.logOffset() + 1);
// We keep the maximum message timestamp in the fetch for calculating lags
maxConsumerRecordTimestampInFetch =
Math.max(maxConsumerRecordTimestampInFetch, lastRecord.timestamp());
Expand Down Expand Up @@ -568,6 +604,9 @@ public void close() throws Exception {
if (currentBoundedSplitReader != null) {
currentBoundedSplitReader.close();
}
if (logEndOffsetRefresher != null) {
logEndOffsetRefresher.close();
}
if (logScanner != null) {
logScanner.close();
}
Expand Down
Loading