Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -96,13 +97,21 @@ private LakeTable(

@Nullable
public LakeSnapshotMetadata getLatestLakeSnapshotMetadata() {
if (lakeSnapshotMetadatas != null && !lakeSnapshotMetadatas.isEmpty()) {
// the last one snapshot may be a compacted snapshot which is
// not latest snapshot. todo: fix to return the real latest snapshot in
// #2625
return lakeSnapshotMetadatas.get(lakeSnapshotMetadatas.size() - 1);
if (lakeSnapshotMetadatas == null || lakeSnapshotMetadatas.isEmpty()) {
return null;
}
return null;
// Pick the entry with the largest commitTimestamp to handle out-of-order
// commits. When readable/tiered snapshots arrive in wrong order, use
// commitTimestamp to identify the real latest snapshot. Legacy entries
// without timestamp fall back to list order for compatibility.
LakeSnapshotMetadata best = lakeSnapshotMetadatas.get(0);
for (int i = 1; i < lakeSnapshotMetadatas.size(); i++) {
LakeSnapshotMetadata cur = lakeSnapshotMetadatas.get(i);
if (cur.getCommitTimestamp() >= best.getCommitTimestamp()) {
best = cur;
}
}
return best;
}

@Nullable
Expand Down Expand Up @@ -182,8 +191,27 @@ public LakeTableSnapshot getOrReadLatestReadableTableSnapshot() throws IOExcepti
// flink connector upgrade, and call getOrReadLatestReadableTableSnapshot
// will always get null.
// todo: do we need to consider such case?
for (int i = checkNotNull(lakeSnapshotMetadatas).size() - 1; i >= 0; i--) {
LakeSnapshotMetadata snapshotMetadata = lakeSnapshotMetadatas.get(i);
//
// Sort by commit_timestamp to get the latest readable snapshot regardless
// of physical list order. Handles out-of-order RPCs and maintains
// compatibility with legacy entries that lack timestamps.
List<LakeSnapshotMetadata> snapshots = checkNotNull(lakeSnapshotMetadatas);
Integer[] order = new Integer[snapshots.size()];
for (int i = 0; i < order.length; i++) {
order[i] = i;
}
Arrays.sort(
order,
(i, j) -> {
int cmp =
Long.compare(
snapshots.get(j).getCommitTimestamp(),
snapshots.get(i).getCommitTimestamp());
// tie on timestamp: later original index first
return cmp != 0 ? cmp : Integer.compare(j, i);
});
for (int idx : order) {
LakeSnapshotMetadata snapshotMetadata = snapshots.get(idx);
if (snapshotMetadata.readableOffsetsFilePath != null) {
return toLakeTableSnapshot(
snapshotMetadata.snapshotId, snapshotMetadata.readableOffsetsFilePath);
Expand Down Expand Up @@ -211,6 +239,9 @@ private LakeTableSnapshot toLakeTableSnapshot(long snapshotId, FsPath offsetFile

/** The lake snapshot metadata entry stored in zk lake table. */
public static class LakeSnapshotMetadata {
// Sentinel value for unknown commit timestamps in legacy entries.
public static final long UNKNOWN_COMMIT_TIMESTAMP = 0L;

private final long snapshotId;

// the file path to file storing the tiered offsets,
Expand All @@ -221,13 +252,31 @@ public static class LakeSnapshotMetadata {
// will be null if we don't now the readable offsets for this snapshot
@Nullable private final FsPath readableOffsetsFilePath;

// Server-side timestamp to determine the real latest snapshot regardless of list order.
// Legacy entries without timestamp fall back to list order.
private final long commitTimestamp;

// Legacy constructor kept for backward compatibility.
public LakeSnapshotMetadata(
long snapshotId,
FsPath tieredOffsetsFilePath,
@Nullable FsPath readableOffsetsFilePath) {
this(
snapshotId,
tieredOffsetsFilePath,
readableOffsetsFilePath,
UNKNOWN_COMMIT_TIMESTAMP);
}

public LakeSnapshotMetadata(
long snapshotId,
FsPath tieredOffsetsFilePath,
@Nullable FsPath readableOffsetsFilePath,
long commitTimestamp) {
this.snapshotId = snapshotId;
this.tieredOffsetsFilePath = tieredOffsetsFilePath;
this.readableOffsetsFilePath = readableOffsetsFilePath;
this.commitTimestamp = commitTimestamp;
}

public long getSnapshotId() {
Expand All @@ -242,6 +291,10 @@ public FsPath getReadableOffsetsFilePath() {
return readableOffsetsFilePath;
}

public long getCommitTimestamp() {
return commitTimestamp;
}

public void discard() {
if (tieredOffsetsFilePath != null) {
delete(tieredOffsetsFilePath);
Expand Down Expand Up @@ -273,13 +326,15 @@ public boolean equals(Object o) {
}
LakeSnapshotMetadata that = (LakeSnapshotMetadata) o;
return snapshotId == that.snapshotId
&& commitTimestamp == that.commitTimestamp
&& Objects.equals(tieredOffsetsFilePath, that.tieredOffsetsFilePath)
&& Objects.equals(readableOffsetsFilePath, that.readableOffsetsFilePath);
}

@Override
public int hashCode() {
return Objects.hash(snapshotId, tieredOffsetsFilePath, readableOffsetsFilePath);
return Objects.hash(
snapshotId, tieredOffsetsFilePath, readableOffsetsFilePath, commitTimestamp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,16 @@ public void registerLakeTableSnapshotV2(
.map(LakeTable::getLakeSnapshotMetadatas)
.orElse(Collections.emptyList());

// Stamp with strictly monotonic commit timestamp to handle out-of-order RPCs.
// Uses max(now, prevMax + 1) to ensure correct ordering even with clock jumps.
LakeTable.LakeSnapshotMetadata stampedMetadata =
stampCommitTimestamp(lakeSnapshotMetadata, previousMetadatas);

// Determine which snapshots to keep and which to discard (but don't discard yet)

Tuple2<List<LakeTable.LakeSnapshotMetadata>, List<LakeTable.LakeSnapshotMetadata>> result =
determineSnapshotsToKeepAndDiscard(
previousMetadatas, lakeSnapshotMetadata, earliestSnapshotIDToKeep);
previousMetadatas, stampedMetadata, earliestSnapshotIDToKeep);

List<LakeTable.LakeSnapshotMetadata> keptSnapshots = result.f0;
List<LakeTable.LakeSnapshotMetadata> snapshotsToDiscard = result.f1;
Expand All @@ -124,6 +129,24 @@ public void registerLakeTableSnapshotV2(
}
}

/** Adds strictly monotonic commit timestamp to snapshot metadata. */
private LakeTable.LakeSnapshotMetadata stampCommitTimestamp(
LakeTable.LakeSnapshotMetadata lakeSnapshotMetadata,
List<LakeTable.LakeSnapshotMetadata> previousMetadatas) {
long maxExistingTs = LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP;
for (LakeTable.LakeSnapshotMetadata md : previousMetadatas) {
if (md.getCommitTimestamp() > maxExistingTs) {
maxExistingTs = md.getCommitTimestamp();
}
}
long now = Math.max(System.currentTimeMillis(), maxExistingTs + 1);
return new LakeTable.LakeSnapshotMetadata(
lakeSnapshotMetadata.getSnapshotId(),
lakeSnapshotMetadata.getTieredOffsetsFilePath(),
lakeSnapshotMetadata.getReadableOffsetsFilePath(),
now);
}

/**
* Determines which snapshots should be retained or discarded based on the timeline according to
* {@code earliestSnapshotIDToKeep}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class LakeTableJsonSerde implements JsonSerializer<LakeTable>, JsonDeseri
private static final String SNAPSHOT_ID_KEY = "snapshot_id";
private static final String TIERED_OFFSETS_KEY = "tiered_offsets";
private static final String READABLE_OFFSETS_KEY = "readable_offsets";
// Optional timestamp for legacy compatibility.
private static final String COMMIT_TIMESTAMP_KEY = "commit_timestamp";

private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
Expand Down Expand Up @@ -92,6 +94,13 @@ private void serializeV2(LakeTable lakeTable, JsonGenerator generator) throws IO
READABLE_OFFSETS_KEY,
lakeSnapshotMetadata.getReadableOffsetsFilePath().toString());
}
// commit_timestamp is omitted when unknown to keep JSON compact.
// Readers fall back to UNKNOWN_COMMIT_TIMESTAMP if absent.
if (lakeSnapshotMetadata.getCommitTimestamp()
!= LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP) {
generator.writeNumberField(
COMMIT_TIMESTAMP_KEY, lakeSnapshotMetadata.getCommitTimestamp());
}
generator.writeEndObject();
}
generator.writeEndArray();
Expand Down Expand Up @@ -133,10 +142,20 @@ private LakeTable deserializeV2(JsonNode node) {
JsonNode readableOffsetsNode = snapshotNode.get(READABLE_OFFSETS_KEY);
FsPath readableOffsetsPath =
readableOffsetsNode != null ? new FsPath(readableOffsetsNode.asText()) : null;
// Optional field. Legacy znodes don't have it; fall back to
// UNKNOWN_COMMIT_TIMESTAMP for list-order semantics.
JsonNode commitTsNode = snapshotNode.get(COMMIT_TIMESTAMP_KEY);
long commitTimestamp =
commitTsNode != null
? commitTsNode.asLong()
: LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP;

LakeTable.LakeSnapshotMetadata metadata =
new LakeTable.LakeSnapshotMetadata(
snapshotId, new FsPath(tieredOffsetsPath), readableOffsetsPath);
snapshotId,
new FsPath(tieredOffsetsPath),
readableOffsetsPath,
commitTimestamp);
lakeSnapshotMetadatas.add(metadata);
}
return new LakeTable(lakeSnapshotMetadatas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,47 @@ private TableRegistration createTableReg(long tableId) {
System.currentTimeMillis(),
System.currentTimeMillis());
}

/**
* Verifies that {@link LakeTableHelper#registerLakeTableSnapshotV2} stamps each new entry with
* a strictly monotonically increasing {@code commit_timestamp}, which is the building block for
* #2625's out-of-order-tolerant latest-snapshot selection.
*/
@Test
void testRegisterLakeTableSnapshotV2StampsMonotonicCommitTimestamp(@TempDir Path tempDir)
throws Exception {
LakeTableHelper lakeTableHelper = new LakeTableHelper(zookeeperClient, tempDir.toString());
long tableId = 42L;
TablePath tablePath = TablePath.of("test_db", "ts_mono_test");
zookeeperClient.registerTable(tablePath, createTableReg(tableId));

// Register three snapshots back-to-back, keeping history so we can inspect all timestamps.
for (long sid = 1L; sid <= 3L; sid++) {
FsPath p = storeOffsetFile(lakeTableHelper, tablePath, tableId, sid * 100L);
lakeTableHelper.registerLakeTableSnapshotV2(
tableId,
new LakeTable.LakeSnapshotMetadata(sid, p, p),
LakeCommitResult.KEEP_ALL_PREVIOUS);
}

List<LakeTable.LakeSnapshotMetadata> persisted =
zookeeperClient.getLakeTable(tableId).get().getLakeSnapshotMetadatas();
assertThat(persisted).hasSize(3);

// All timestamps must be non-zero (server-side stamped) and strictly increasing.
long prev = Long.MIN_VALUE;
for (LakeTable.LakeSnapshotMetadata md : persisted) {
assertThat(md.getCommitTimestamp())
.as("entry " + md.getSnapshotId())
.isGreaterThan(prev)
.isNotEqualTo(LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP);
prev = md.getCommitTimestamp();
}

// And getLatestLakeSnapshotMetadata returns the entry whose timestamp is largest.
LakeTable.LakeSnapshotMetadata latest =
zookeeperClient.getLakeTable(tableId).get().getLatestLakeSnapshotMetadata();
assertThat(latest).isNotNull();
assertThat(latest.getSnapshotId()).isEqualTo(3L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,68 @@ void testVersion1Compatibility() throws IOException {
LakeTableSnapshot expectedSnapshot2 = new LakeTableSnapshot(11L, expectedBuckets2);
assertThat(actual2.getOrReadLatestTableSnapshot()).isEqualTo(expectedSnapshot2);
}

/**
* Verifies forward / backward compatibility of the optional {@code commit_timestamp} field
* introduced in #2625.
*
* <ul>
* <li>Legacy V2 JSON without {@code commit_timestamp} must deserialize successfully and the
* in-memory entry should report {@link
* LakeTable.LakeSnapshotMetadata#UNKNOWN_COMMIT_TIMESTAMP}.
* <li>V2 JSON containing {@code commit_timestamp} must round-trip and the value must be
* preserved.
* <li>An entry stamped with a non-zero timestamp must serialize the field; an entry whose
* timestamp is {@code UNKNOWN_COMMIT_TIMESTAMP} must omit the field (keeping output
* byte-compatible with legacy znodes).
* </ul>
*/
@Test
void testCommitTimestampJsonCompatibility() throws IOException {
// 1. Legacy V2 JSON (no commit_timestamp) -> fallback to UNKNOWN_COMMIT_TIMESTAMP
String legacyV2 =
"{\"version\":2,\"lake_snapshots\":["
+ "{\"snapshot_id\":7,\"tiered_offsets\":\"/p/t7\",\"readable_offsets\":\"/p/r7\"}"
+ "]}";
LakeTable parsedLegacy =
JsonSerdeUtils.readValue(
legacyV2.getBytes(StandardCharsets.UTF_8), LakeTableJsonSerde.INSTANCE);
assertThat(parsedLegacy.getLakeSnapshotMetadatas()).hasSize(1);
assertThat(parsedLegacy.getLakeSnapshotMetadatas().get(0).getCommitTimestamp())
.isEqualTo(LakeTable.LakeSnapshotMetadata.UNKNOWN_COMMIT_TIMESTAMP);

// 2. New V2 JSON with commit_timestamp -> value preserved
String newV2 =
"{\"version\":2,\"lake_snapshots\":["
+ "{\"snapshot_id\":8,\"tiered_offsets\":\"/p/t8\","
+ "\"readable_offsets\":\"/p/r8\",\"commit_timestamp\":12345678}"
+ "]}";
LakeTable parsedNew =
JsonSerdeUtils.readValue(
newV2.getBytes(StandardCharsets.UTF_8), LakeTableJsonSerde.INSTANCE);
assertThat(parsedNew.getLakeSnapshotMetadatas()).hasSize(1);
assertThat(parsedNew.getLakeSnapshotMetadatas().get(0).getCommitTimestamp())
.isEqualTo(12345678L);

// 3. Round-trip: stamped entry serializes the field, unknown entry omits it.
LakeTable.LakeSnapshotMetadata stamped =
new LakeTable.LakeSnapshotMetadata(
9L, new FsPath("/p/t9"), new FsPath("/p/r9"), 99999L);
LakeTable.LakeSnapshotMetadata unstamped =
new LakeTable.LakeSnapshotMetadata(10L, new FsPath("/p/t10"), null);
List<LakeTable.LakeSnapshotMetadata> mixed = new ArrayList<>();
mixed.add(stamped);
mixed.add(unstamped);
LakeTable mixedTable = new LakeTable(mixed);

byte[] serialized =
JsonSerdeUtils.writeValueAsBytes(mixedTable, LakeTableJsonSerde.INSTANCE);
String serializedStr = new String(serialized, StandardCharsets.UTF_8);
assertThat(serializedStr).contains("\"commit_timestamp\":99999");
// entry #2 is unstamped, the field should be absent
assertThat(serializedStr).doesNotContain("\"commit_timestamp\":0");

LakeTable roundTripped = JsonSerdeUtils.readValue(serialized, LakeTableJsonSerde.INSTANCE);
assertThat(roundTripped).isEqualTo(mixedTable);
}
}
Loading