From db5061cfaa818181fbcfaeeefc597cf12ab87b10 Mon Sep 17 00:00:00 2001 From: honestmanxin Date: Fri, 29 May 2026 15:12:37 +0800 Subject: [PATCH] [fix](catalog) Refresh remote OLAP partitions on replica relocation --- .../org/apache/doris/catalog/Partition.java | 86 +++++++++ .../org/apache/doris/common/util/Util.java | 15 ++ .../datasource/doris/FeServiceClient.java | 57 ++++-- .../doris/service/FrontendServiceImpl.java | 167 +++++++++++------- .../doris/catalog/MaterializedIndexTest.java | 134 ++++++++++++++ gensrc/thrift/FrontendService.thrift | 3 + 6 files changed, 376 insertions(+), 86 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 4edc4d5888ec26..702c60340f6506 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -23,15 +23,20 @@ import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.Util; import org.apache.doris.rpc.RpcException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.security.MessageDigest; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -92,6 +97,8 @@ public enum PartitionState { @SerializedName(value = "di", alternate = {"distributionInfo"}) private DistributionInfo distributionInfo; + private transient volatile String remoteMetaChecksum; + protected Partition() { } @@ -258,6 +265,85 @@ public List getMaterializedIndices(IndexExtState extState) { return indices; } + public String getMetaChecksum() { + MessageDigest digest = DigestUtils.getSha256Digest(); + // Include partition-level fields whose changes should invalidate the cached + // remote partition payload, even when visibleVersion / visibleVersionTime + // remain unchanged (e.g. ALTER TABLE ... RENAME PARTITION only mutates name). + updateMetaChecksum(digest, (byte) 11, id); + updateMetaChecksumString(digest, (byte) 12, name); + updateMetaChecksum(digest, (byte) 13, state == null ? -1L : state.ordinal()); + updateMetaChecksum(digest, (byte) 14, visibleVersion); + updateMetaChecksum(digest, (byte) 15, visibleVersionTime); + updateMetaChecksum(digest, (byte) 16, nextVersion); + if (distributionInfo != null) { + DistributionInfoType distType = distributionInfo.getType(); + updateMetaChecksum(digest, (byte) 17, distType == null ? -1L : distType.ordinal()); + updateMetaChecksum(digest, (byte) 18, distributionInfo.getBucketNum()); + updateMetaChecksum(digest, (byte) 19, distributionInfo.getAutoBucket() ? 1L : 0L); + } else { + updateMetaChecksum(digest, (byte) 17, -1L); + } + List indexes = getMaterializedIndices(IndexExtState.VISIBLE); + indexes.sort(Comparator.comparingLong(MaterializedIndex::getId)); + for (MaterializedIndex index : indexes) { + updateMetaChecksum(digest, (byte) 1, index.getId()); + List tablets = Lists.newArrayList(index.getTablets()); + tablets.sort(Comparator.comparingLong(Tablet::getId)); + for (Tablet tablet : tablets) { + updateMetaChecksum(digest, (byte) 2, tablet.getId()); + List replicas = Lists.newArrayList(tablet.getReplicas()); + replicas.sort(Comparator.comparingLong(Replica::getId) + .thenComparingLong(Replica::getBackendIdWithoutException)); + for (Replica replica : replicas) { + updateMetaChecksum(digest, (byte) 3, replica.getId()); + updateMetaChecksum(digest, (byte) 4, replica.getBackendIdWithoutException()); + // Include all replica fields that affect getQueryableReplicas() filtering, + // so a stale remote cache is invalidated whenever any of them changes + // (e.g. replica becomes bad, lastFailedVersion is set, version/state changes). + updateMetaChecksum(digest, (byte) 5, replica.getVersion()); + updateMetaChecksum(digest, (byte) 6, replica.getLastFailedVersion()); + updateMetaChecksum(digest, (byte) 7, replica.getPathHash()); + Replica.ReplicaState state = replica.getState(); + updateMetaChecksum(digest, (byte) 8, state == null ? -1L : state.ordinal()); + updateMetaChecksum(digest, (byte) 9, replica.isBad() ? 1L : 0L); + updateMetaChecksum(digest, (byte) 10, replica.isUserDrop() ? 1L : 0L); + } + } + } + return Hex.encodeHexString(digest.digest()); + } + + public String getRemoteMetaChecksum() { + return remoteMetaChecksum; + } + + public void setRemoteMetaChecksum(String checksum) { + if (checksum != null) { + this.remoteMetaChecksum = checksum; + } + } + + private void updateMetaChecksum(MessageDigest digest, byte tag, long value) { + Util.updateMessageDigest(digest, tag); + Util.updateMessageDigest(digest, value); + } + + private void updateMetaChecksumString(MessageDigest digest, byte tag, String value) { + Util.updateMessageDigest(digest, tag); + if (value == null) { + Util.updateMessageDigest(digest, -1L); + return; + } + int len = value.length(); + Util.updateMessageDigest(digest, (long) len); + for (int i = 0; i < len; i++) { + char c = value.charAt(i); + digest.update((byte) (c >>> 8)); + digest.update((byte) c); + } + } + public long getAllDataSize(boolean singleReplica) { return getDataSize(singleReplica) + getRemoteDataSize(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index e260418227aae6..25c0d81295ba21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -113,6 +113,21 @@ public static LongUnaryOperator overflowSafeIncrement() { }; } + public static void updateMessageDigest(MessageDigest digest, byte value) { + digest.update(value); + } + + public static void updateMessageDigest(MessageDigest digest, long value) { + digest.update((byte) (value >>> 56)); + digest.update((byte) (value >>> 48)); + digest.update((byte) (value >>> 40)); + digest.update((byte) (value >>> 32)); + digest.update((byte) (value >>> 24)); + digest.update((byte) (value >>> 16)); + digest.update((byte) (value >>> 8)); + digest.update((byte) value); + } + // Get a string represent the schema signature, contains: // list of columns and bloom filter column info. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java index 10ef6435ff16aa..8fa391fb77629c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/FeServiceClient.java @@ -67,6 +67,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -266,18 +267,10 @@ public RemoteOlapTable getOlapTable(String dbName, String table, long tableId, L request.setPasswd(password); request.setVersion(FeConstants.meta_version); for (Partition partition : partitions) { - TPartitionMeta meta = new TPartitionMeta(); - meta.setId(partition.getId()); - meta.setVisibleVersion(partition.getVisibleVersion()); - meta.setVisibleVersionTime(partition.getVisibleVersionTime()); - request.addToPartitions(meta); + request.addToPartitions(buildPartitionMeta(partition)); } for (Partition partition : tempPartitions) { - TPartitionMeta meta = new TPartitionMeta(); - meta.setId(partition.getId()); - meta.setVisibleVersion(partition.getVisibleVersion()); - meta.setVisibleVersionTime(partition.getVisibleVersionTime()); - request.addToTempPartitions(meta); + request.addToTempPartitions(buildPartitionMeta(partition)); } String msg = String.format("failed to get table meta from remote doris:%s", name); return randomCallWithRetry(client -> { @@ -291,13 +284,17 @@ public RemoteOlapTable getOlapTable(String dbName, String table, long tableId, L remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable); } List updatedPartitions = new ArrayList<>(result.getUpdatedPartitionsSize()); + List updatedPartitionChecksums = result.isSetUpdatedPartitionChecksums() + ? result.getUpdatedPartitionChecksums() : Collections.emptyList(); if (result.getUpdatedPartitionsSize() > 0) { - for (ByteBuffer buffer : result.getUpdatedPartitions()) { + for (int i = 0; i < result.getUpdatedPartitionsSize(); i++) { + ByteBuffer buffer = result.getUpdatedPartitions().get(i); try (ByteArrayInputStream in = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.remaining()); DataInputStream dataInputStream = new DataInputStream(in)) { String partitionStr = Text.readString(dataInputStream); Partition partition = GsonUtils.GSON.fromJson(partitionStr, Partition.class); + setRemoteMetaChecksum(partition, updatedPartitionChecksums, i); updatedPartitions.add(partition); } } @@ -308,27 +305,51 @@ public RemoteOlapTable getOlapTable(String dbName, String table, long tableId, L } remoteOlapTable.rebuildPartitions(partitions, updatedPartitions, removedPartitions); // rebuild temp partitions + List updatedTempPartitions = new ArrayList<>(); if (result.isSetUpdatedTempPartitions() && result.getUpdatedTempPartitionsSize() > 0) { - updatedPartitions = new ArrayList<>(result.getUpdatedTempPartitionsSize()); - for (ByteBuffer buffer : result.getUpdatedTempPartitions()) { + List updatedTempPartitionChecksums = result.isSetUpdatedTempPartitionChecksums() + ? result.getUpdatedTempPartitionChecksums() : Collections.emptyList(); + for (int i = 0; i < result.getUpdatedTempPartitionsSize(); i++) { + ByteBuffer buffer = result.getUpdatedTempPartitions().get(i); try (ByteArrayInputStream in = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.remaining()); DataInputStream dataInputStream = new DataInputStream(in)) { String partitionStr = Text.readString(dataInputStream); Partition partition = GsonUtils.GSON.fromJson(partitionStr, Partition.class); - updatedPartitions.add(partition); + setRemoteMetaChecksum(partition, updatedTempPartitionChecksums, i); + updatedTempPartitions.add(partition); } } } - removedPartitions = result.getRemovedTempPartitions(); - if (removedPartitions == null) { - removedPartitions = new ArrayList<>(); + List removedTempPartitions = result.getRemovedTempPartitions(); + if (removedTempPartitions == null) { + removedTempPartitions = new ArrayList<>(); } - remoteOlapTable.rebuildTempPartitions(tempPartitions, updatedPartitions, removedPartitions); + remoteOlapTable.rebuildTempPartitions(tempPartitions, updatedTempPartitions, removedTempPartitions); return remoteOlapTable; }, msg, timeoutMs); } + private TPartitionMeta buildPartitionMeta(Partition partition) { + TPartitionMeta meta = new TPartitionMeta(); + meta.setId(partition.getId()); + meta.setVisibleVersion(partition.getVisibleVersion()); + meta.setVisibleVersionTime(partition.getVisibleVersionTime()); + String remoteMetaChecksum = partition.getRemoteMetaChecksum(); + if (remoteMetaChecksum == null) { + remoteMetaChecksum = partition.getMetaChecksum(); + partition.setRemoteMetaChecksum(remoteMetaChecksum); + } + meta.setMetaChecksum(remoteMetaChecksum); + return meta; + } + + private void setRemoteMetaChecksum(Partition partition, List checksums, int index) { + if (index < checksums.size()) { + partition.setRemoteMetaChecksum(checksums.get(index)); + } + } + public TBeginRemoteTxnResult beginRemoteTxn(TBeginRemoteTxnRequest request) throws Exception { request.setUser(user); request.setPasswd(password); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 411b7d4c20bf8b..dcc7536850073b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -5451,82 +5451,40 @@ public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest request MetaContext metaContext = new MetaContext(); metaContext.setMetaVersion(FeConstants.meta_version); metaContext.setThreadLocalInfo(); - table.readLock(); try (ByteArrayOutputStream bOutputStream = new ByteArrayOutputStream(8192)) { - OlapTable copyTable = table.copyTableMeta(); - try (DataOutputStream out = new DataOutputStream(bOutputStream)) { - copyTable.write(out); - out.flush(); - result.setTableMeta(bOutputStream.toByteArray()); - } - Set updatedPartitionIds = Sets.newHashSet(table.getPartitionIds()); - List partitionMetas = request.getPartitionsSize() == 0 ? Lists.newArrayList() - : request.getPartitions(); - for (TPartitionMeta partitionMeta : partitionMetas) { - if (request.getTableId() != table.getId()) { - result.addToRemovedPartitions(partitionMeta.getId()); - continue; - } - Partition partition = table.getPartition(partitionMeta.getId()); - if (partition == null) { - result.addToRemovedPartitions(partitionMeta.getId()); - continue; - } - if (partition.getVisibleVersion() == partitionMeta.getVisibleVersion() - && partition.getVisibleVersionTime() == partitionMeta.getVisibleVersionTime()) { - updatedPartitionIds.remove(partitionMeta.getId()); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("receive getOlapTableMeta db: {} table:{} update partitions: {} removed partition:{}", - request.getDb(), request.getTable(), updatedPartitionIds.size(), - result.getRemovedPartitionsSize()); - } - for (Long partitionId : updatedPartitionIds) { - bOutputStream.reset(); - Partition partition = table.getPartition(partitionId); + Set updatedPartitionIds; + Set updatedTempPartitionIds; + Map partitionChecksums = Maps.newHashMap(); + Map tempPartitionChecksums = Maps.newHashMap(); + table.readLock(); + try { + OlapTable copyTable = table.copyTableMeta(); try (DataOutputStream out = new DataOutputStream(bOutputStream)) { - Text.writeString(out, GsonUtils.GSON.toJson(partition)); + copyTable.write(out); out.flush(); - result.addToUpdatedPartitions(ByteBuffer.wrap(bOutputStream.toByteArray())); + result.setTableMeta(bOutputStream.toByteArray()); } - } - // temp partitions - updatedPartitionIds = Sets.newHashSet(table.getTempPartitions().getPartitionIds()); - partitionMetas = request.getTempPartitionsSize() == 0 ? Lists.newArrayList() - : request.getTempPartitions(); - for (TPartitionMeta partitionMeta : partitionMetas) { - if (request.getTableId() != table.getId()) { - result.addToRemovedTempPartitions(partitionMeta.getId()); - continue; - } - Partition tempPartition = table.getTempPartitions().getPartition(partitionMeta.getId()); - if (tempPartition == null) { - result.addToRemovedTempPartitions(partitionMeta.getId()); - continue; - } - if (tempPartition.getVisibleVersion() == partitionMeta.getVisibleVersion() - && tempPartition.getVisibleVersionTime() == partitionMeta.getVisibleVersionTime()) { - updatedPartitionIds.remove(partitionMeta.getId()); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("update temp partitions: {}, removed temp partition:{}", - updatedPartitionIds.size(), result.getRemovedPartitionsSize()); - } - for (Long partitionId : updatedPartitionIds) { - bOutputStream.reset(); - Partition partition = table.getTempPartitions().getPartition(partitionId); - try (DataOutputStream out = new DataOutputStream(bOutputStream)) { - Text.writeString(out, GsonUtils.GSON.toJson(partition)); - out.flush(); - result.addToUpdatedTempPartitions(ByteBuffer.wrap(bOutputStream.toByteArray())); + updatedPartitionIds = Sets.newHashSet(table.getPartitionIds()); + collectPartitionChanges(table, request.getTableId(), request.getPartitions(), false, + updatedPartitionIds, partitionChecksums, result); + updatedTempPartitionIds = Sets.newHashSet(table.getTempPartitions().getPartitionIds()); + collectPartitionChanges(table, request.getTableId(), request.getTempPartitions(), true, + updatedTempPartitionIds, tempPartitionChecksums, result); + if (LOG.isDebugEnabled()) { + LOG.debug("receive getOlapTableMeta db: {} table:{} update partitions: {} " + + "removed partition:{} update temp partitions: {} removed temp partition:{}", + request.getDb(), request.getTable(), updatedPartitionIds.size(), + result.getRemovedPartitionsSize(), updatedTempPartitionIds.size(), + result.getRemovedTempPartitionsSize()); } + addUpdatedPartitions(table, updatedPartitionIds, false, partitionChecksums, bOutputStream, result); + addUpdatedPartitions(table, updatedTempPartitionIds, true, tempPartitionChecksums, + bOutputStream, result); + } finally { + table.readUnlock(); } return result; } finally { - table.readUnlock(); MetaContext.remove(); } } catch (AuthenticationException e) { @@ -5549,6 +5507,79 @@ public TGetOlapTableMetaResult getOlapTableMeta(TGetOlapTableMetaRequest request } } + private void collectPartitionChanges(OlapTable table, long requestTableId, + List partitionMetas, boolean tempPartition, Set updatedPartitionIds, + Map partitionChecksums, TGetOlapTableMetaResult result) { + if (partitionMetas == null) { + return; + } + for (TPartitionMeta partitionMeta : partitionMetas) { + long partitionId = partitionMeta.getId(); + if (requestTableId != table.getId()) { + addRemovedPartition(result, partitionId, tempPartition); + continue; + } + Partition partition = getPartition(table, partitionId, tempPartition); + if (partition == null) { + addRemovedPartition(result, partitionId, tempPartition); + continue; + } + if (isPartitionVersionMatched(partition, partitionMeta)) { + if (!partitionMeta.isSetMetaChecksum()) { + updatedPartitionIds.remove(partitionId); + continue; + } + String metaChecksum = getPartitionMetaChecksum(partition, partitionChecksums); + if (metaChecksum.equals(partitionMeta.getMetaChecksum())) { + updatedPartitionIds.remove(partitionId); + } + } + } + } + + private void addUpdatedPartitions(OlapTable table, Set updatedPartitionIds, boolean tempPartition, + Map partitionChecksums, ByteArrayOutputStream bOutputStream, + TGetOlapTableMetaResult result) throws IOException { + for (Long partitionId : updatedPartitionIds) { + Partition partition = getPartition(table, partitionId, tempPartition); + Preconditions.checkState(partition != null); + String metaChecksum = getPartitionMetaChecksum(partition, partitionChecksums); + bOutputStream.reset(); + try (DataOutputStream out = new DataOutputStream(bOutputStream)) { + Text.writeString(out, GsonUtils.GSON.toJson(partition)); + out.flush(); + if (tempPartition) { + result.addToUpdatedTempPartitions(ByteBuffer.wrap(bOutputStream.toByteArray())); + result.addToUpdatedTempPartitionChecksums(metaChecksum); + } else { + result.addToUpdatedPartitions(ByteBuffer.wrap(bOutputStream.toByteArray())); + result.addToUpdatedPartitionChecksums(metaChecksum); + } + } + } + } + + private String getPartitionMetaChecksum(Partition partition, Map partitionChecksums) { + return partitionChecksums.computeIfAbsent(partition.getId(), key -> partition.getMetaChecksum()); + } + + private Partition getPartition(OlapTable table, long partitionId, boolean tempPartition) { + return tempPartition ? table.getTempPartitions().getPartition(partitionId) : table.getPartition(partitionId); + } + + private void addRemovedPartition(TGetOlapTableMetaResult result, long partitionId, boolean tempPartition) { + if (tempPartition) { + result.addToRemovedTempPartitions(partitionId); + } else { + result.addToRemovedPartitions(partitionId); + } + } + + private boolean isPartitionVersionMatched(Partition partition, TPartitionMeta partitionMeta) { + return partition.getVisibleVersion() == partitionMeta.getVisibleVersion() + && partition.getVisibleVersionTime() == partitionMeta.getVisibleVersionTime(); + } + @Override public TStatus syncCloudTabletStats(TSyncCloudTabletStatsRequest request) throws TException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java index 3ff3f2519a3d44..6044e35b5ecf64 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java @@ -93,6 +93,140 @@ public void testGetTabletsReturnsImmutableSnapshot() { Assert.assertThrows(UnsupportedOperationException.class, () -> snapshot.add(new LocalTablet(3L))); } + @Test + public void testPartitionMetaChecksum() { + MaterializedIndex firstIndex = new MaterializedIndex(1L, IndexState.NORMAL); + LocalTablet firstTablet = new LocalTablet(10L); + firstTablet.addReplica(new LocalReplica(100L, 1000L, Replica.ReplicaState.NORMAL, 1L, 1), true); + firstTablet.addReplica(new LocalReplica(101L, 1001L, Replica.ReplicaState.NORMAL, 1L, 1), true); + firstIndex.addTablet(firstTablet, null, true); + LocalTablet secondTablet = new LocalTablet(11L); + secondTablet.addReplica(new LocalReplica(110L, 1010L, Replica.ReplicaState.NORMAL, 1L, 1), true); + firstIndex.addTablet(secondTablet, null, true); + Partition firstPartition = new Partition(1L, "p1", firstIndex, null); + firstPartition.createRollupIndex(createIndex(2L, 20L, 200L, 2000L)); + firstPartition.createRollupIndex(createIndex(3L, 30L, 300L, 3000L)); + // Pin visibleVersionTime so partitions constructed in different millis stay comparable. + long pinnedVisibleVersionTime = firstPartition.getVisibleVersionTime(); + Partition deserializedPartition = GsonUtils.GSON.fromJson(GsonUtils.GSON.toJson(firstPartition), + Partition.class); + Assert.assertEquals(firstPartition.getMetaChecksum(), deserializedPartition.getMetaChecksum()); + + MaterializedIndex reorderedIndex = new MaterializedIndex(1L, IndexState.NORMAL); + LocalTablet reorderedSecondTablet = new LocalTablet(11L); + reorderedSecondTablet.addReplica(new LocalReplica(110L, 1010L, Replica.ReplicaState.NORMAL, 1L, 1), true); + reorderedIndex.addTablet(reorderedSecondTablet, null, true); + LocalTablet reorderedTablet = new LocalTablet(10L); + reorderedTablet.addReplica(new LocalReplica(101L, 1001L, Replica.ReplicaState.NORMAL, 1L, 1), true); + reorderedTablet.addReplica(new LocalReplica(100L, 1000L, Replica.ReplicaState.NORMAL, 1L, 1), true); + reorderedIndex.addTablet(reorderedTablet, null, true); + Partition reorderedPartition = new Partition(1L, "p1", reorderedIndex, null); + reorderedPartition.setVisibleVersionAndTime(reorderedPartition.getVisibleVersion(), pinnedVisibleVersionTime); + reorderedPartition.createRollupIndex(createIndex(3L, 30L, 300L, 3000L)); + reorderedPartition.createRollupIndex(createIndex(2L, 20L, 200L, 2000L)); + Assert.assertEquals(firstPartition.getMetaChecksum(), reorderedPartition.getMetaChecksum()); + + MaterializedIndex movedIndex = new MaterializedIndex(1L, IndexState.NORMAL); + LocalTablet movedTablet = new LocalTablet(10L); + movedTablet.addReplica(new LocalReplica(100L, 1000L, Replica.ReplicaState.NORMAL, 1L, 1), true); + movedTablet.addReplica(new LocalReplica(102L, 1002L, Replica.ReplicaState.NORMAL, 1L, 1), true); + movedIndex.addTablet(movedTablet, null, true); + movedIndex.addTablet(secondTablet, null, true); + Partition movedPartition = new Partition(1L, "p1", movedIndex, null); + movedPartition.setVisibleVersionAndTime(movedPartition.getVisibleVersion(), pinnedVisibleVersionTime); + movedPartition.createRollupIndex(createIndex(2L, 20L, 200L, 2000L)); + movedPartition.createRollupIndex(createIndex(3L, 30L, 300L, 3000L)); + Assert.assertNotEquals(firstPartition.getMetaChecksum(), movedPartition.getMetaChecksum()); + + firstPartition.setRemoteMetaChecksum(firstPartition.getMetaChecksum()); + Assert.assertEquals(firstPartition.getMetaChecksum(), firstPartition.getRemoteMetaChecksum()); + } + + @Test + public void testPartitionMetaChecksumChangesOnReplicaQueryFields() { + // Build a partition with one tablet/replica. + MaterializedIndex baseIndex = new MaterializedIndex(1L, IndexState.NORMAL); + LocalTablet tablet = new LocalTablet(10L); + LocalReplica replica = new LocalReplica(100L, 1000L, Replica.ReplicaState.NORMAL, 1L, 1); + tablet.addReplica(replica, true); + baseIndex.addTablet(tablet, null, true); + Partition partition = new Partition(1L, "p1", baseIndex, null); + String original = partition.getMetaChecksum(); + + // 1) lastFailedVersion change must invalidate the checksum. + replica.updateLastFailedVersion(5L); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + replica.updateLastFailedVersion(-1L); + Assert.assertEquals(original, partition.getMetaChecksum()); + + // 2) state change must invalidate the checksum. + replica.setState(Replica.ReplicaState.DECOMMISSION); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + replica.setState(Replica.ReplicaState.NORMAL); + Assert.assertEquals(original, partition.getMetaChecksum()); + + // 3) bad flag change must invalidate the checksum. + Assert.assertTrue(replica.setBad(true)); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + Assert.assertTrue(replica.setBad(false)); + Assert.assertEquals(original, partition.getMetaChecksum()); + + // 4) pathHash change must invalidate the checksum. + replica.setPathHash(99L); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + replica.setPathHash(-1L); + Assert.assertEquals(original, partition.getMetaChecksum()); + + // 5) version change must invalidate the checksum. Replica.updateVersion() + // refuses to roll back, so this is asserted last with a one-way change. + replica.updateVersion(7L); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + } + + @Test + public void testPartitionMetaChecksumChangesOnPartitionTopLevelFields() { + MaterializedIndex baseIndex = new MaterializedIndex(1L, IndexState.NORMAL); + LocalTablet tablet = new LocalTablet(10L); + tablet.addReplica(new LocalReplica(100L, 1000L, Replica.ReplicaState.NORMAL, 1L, 1), true); + baseIndex.addTablet(tablet, null, true); + RandomDistributionInfo distributionInfo = new RandomDistributionInfo(3); + Partition partition = new Partition(1L, "p1", baseIndex, distributionInfo); + String original = partition.getMetaChecksum(); + + // RENAME PARTITION only mutates the partition name, with no visible version change; + // the checksum must still change so the remote cache can detect the rename. + partition.setName("p1_renamed"); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + partition.setName("p1"); + Assert.assertEquals(original, partition.getMetaChecksum()); + + // PartitionState changes (e.g. RESTORE) must invalidate the checksum. + partition.setState(Partition.PartitionState.RESTORE); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + partition.setState(Partition.PartitionState.NORMAL); + Assert.assertEquals(original, partition.getMetaChecksum()); + + // DistributionInfo bucket-num change must invalidate the checksum. + int oldBucketNum = distributionInfo.getBucketNum(); + distributionInfo.setBucketNum(oldBucketNum + 2); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + distributionInfo.setBucketNum(oldBucketNum); + Assert.assertEquals(original, partition.getMetaChecksum()); + + // nextVersion changes must invalidate the checksum (asserted last; + // setNextVersion() can't be reverted to its original value safely). + partition.setNextVersion(partition.getNextVersion() + 1); + Assert.assertNotEquals(original, partition.getMetaChecksum()); + } + + private MaterializedIndex createIndex(long indexId, long tabletId, long replicaId, long backendId) { + MaterializedIndex index = new MaterializedIndex(indexId, IndexState.NORMAL); + LocalTablet tablet = new LocalTablet(tabletId); + tablet.addReplica(new LocalReplica(replicaId, backendId, Replica.ReplicaState.NORMAL, 1L, 1), true); + index.addTablet(tablet, null, true); + return index; + } + @Test public void testConcurrentGetTabletsNeverThrows() throws InterruptedException { // A reader repeatedly snapshots and iterates getTablets() while a writer keeps diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c47749de06bc3e..cc74ffbe06558a 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1734,6 +1734,7 @@ struct TPartitionMeta { 1: optional i64 id 2: optional i64 visible_version 3: optional i64 visible_version_time + 4: optional string meta_checksum } struct TGetOlapTableMetaRequest { @@ -1754,6 +1755,8 @@ struct TGetOlapTableMetaResult { 4: optional list removed_partitions 5: optional list updated_temp_partitions 6: optional list removed_temp_partitions + 7: optional list updated_partition_checksums + 8: optional list updated_temp_partition_checksums } // Remote transaction request and Result definitions for cross-cluster export.