diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java index b7804a7e8b07..34c7b23be8a9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java @@ -48,6 +48,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hdds.client.StorageTypeUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -275,7 +276,7 @@ public long getDeleteTransactionId() { } ContainerReplicaProto buildContainerReplicaProto() throws StorageContainerException { - return getStatistics().setContainerReplicaProto(ContainerReplicaProto.newBuilder()) + ContainerReplicaProto.Builder builder = getStatistics().setContainerReplicaProto(ContainerReplicaProto.newBuilder()) .setContainerID(getContainerID()) .setState(getContainerReplicaProtoState(getState())) .setIsEmpty(isEmpty()) @@ -283,8 +284,11 @@ ContainerReplicaProto buildContainerReplicaProto() throws StorageContainerExcept .setReplicaIndex(getReplicaIndex()) .setBlockCommitSequenceId(getBlockCommitSequenceId()) .setDeleteTransactionId(getDeleteTransactionId()) - .setDataChecksum(getDataChecksum()) - .build(); + .setDataChecksum(getDataChecksum()); + if (getStorageType() != null) { + builder.setStorageType(StorageTypeUtils.getStorageTypeProto(getStorageType())); + } + return builder.build(); } // TODO remove one of the State from proto diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 3cc92e6b9961..c7440e6d0df4 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -235,6 +235,7 @@ message ContainerReplicaProto { optional int32 replicaIndex = 14; optional bool isEmpty = 15 [default = false]; optional int64 dataChecksum = 16; + optional StorageTypeProto storageType = 17; } message CommandStatusReportsProto { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index 57234889dcb4..1b331f63cd4d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.function.Supplier; import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.StorageTypeUtils; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -338,14 +339,14 @@ private boolean updateContainerState(final DatanodeDetails datanode, targetState = LifeCycleState.CLOSED; getLogger().info("Resurrecting container {} from {} to CLOSED due to non-empty CLOSED replica " + "(keyCount={}, BCSID={}) from {}", - containerId, container.getState(), replica.getKeyCount(), replica.getBlockCommitSequenceId(), + containerId, container.getState(), replica.getKeyCount(), replica.getBlockCommitSequenceId(), detailsForLogging); } else { // For OPEN, CLOSING, UNHEALTHY, QUASI_CLOSED replicas, transition to QUASI_CLOSED state targetState = LifeCycleState.QUASI_CLOSED; getLogger().info("Resurrecting container {} from {} to QUASI_CLOSED due to non-empty {} replica " + "(keyCount={}, BCSID={}) from {}", - containerId, container.getState(), replica.getState(), replica.getKeyCount(), + containerId, container.getState(), replica.getState(), replica.getKeyCount(), replica.getBlockCommitSequenceId(), detailsForLogging); } containerManager.transitionDeletingOrDeletedToTargetState(containerId, targetState); @@ -379,7 +380,7 @@ private void updateContainerReplica(final DatanodeDetails datanodeDetails, final ContainerReplicaProto replicaProto) throws ContainerNotFoundException, ContainerReplicaNotFoundException { - final ContainerReplica replica = ContainerReplica.newBuilder() + final ContainerReplica.ContainerReplicaBuilder replicaBuilder = ContainerReplica.newBuilder() .setContainerID(containerId) .setContainerState(replicaProto.getState()) .setDatanodeDetails(datanodeDetails) @@ -388,8 +389,11 @@ private void updateContainerReplica(final DatanodeDetails datanodeDetails, .setKeyCount(replicaProto.getKeyCount()) .setReplicaIndex(replicaProto.getReplicaIndex()) .setBytesUsed(replicaProto.getUsed()) - .setEmpty(replicaProto.getIsEmpty()) - .setChecksums(ContainerChecksums.of(replicaProto.getDataChecksum())) + .setEmpty(replicaProto.getIsEmpty()); + if (replicaProto.hasStorageType()) { + replicaBuilder.setStorageType(StorageTypeUtils.getFromProtobuf(replicaProto.getStorageType())); + } + ContainerReplica replica = replicaBuilder.setChecksums(ContainerChecksums.of(replicaProto.getDataChecksum())) .build(); if (replica.getState().equals(State.DELETED)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java index 5c9bd57cd881..a08d627ff819 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java @@ -17,10 +17,12 @@ package org.apache.hadoop.hdds.scm.container; +import jakarta.annotation.Nullable; import java.util.Objects; import org.apache.commons.lang3.builder.CompareToBuilder; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -46,6 +48,7 @@ public final class ContainerReplica implements Comparable { private final long bytesUsed; private final boolean isEmpty; private final ContainerChecksums checksums; + private final StorageType storageType; private ContainerReplica(ContainerReplicaBuilder b) { this.containerID = Objects.requireNonNull(b.containerID, "containerID == null"); @@ -58,6 +61,7 @@ private ContainerReplica(ContainerReplicaBuilder b) { this.isEmpty = b.isEmpty; this.sequenceId = b.sequenceId; this.checksums = Objects.requireNonNull(b.checksums, "checksums == null"); + this.storageType = b.storageType; } public ContainerID getContainerID() { @@ -130,6 +134,11 @@ public long getDataChecksum() { return checksums.getDataChecksum(); } + @Nullable + public StorageType getStorageType() { + return storageType; + } + @Override public int hashCode() { return new HashCodeBuilder(61, 71) @@ -185,6 +194,7 @@ public ContainerReplicaBuilder toBuilder() { .setReplicaIndex(replicaIndex) .setSequenceId(sequenceId) .setEmpty(isEmpty) + .setStorageType(storageType) .setChecksums(checksums); } @@ -200,6 +210,7 @@ public String toString() { + ", bytesUsed=" + bytesUsed + ", " + (isEmpty ? "empty" : "non-empty") + ", checksums=" + checksums + + ", storageType=" + storageType + '}'; } @@ -218,6 +229,7 @@ public static class ContainerReplicaBuilder { private int replicaIndex; private boolean isEmpty; private ContainerChecksums checksums; + private StorageType storageType; /** * Set Container Id. @@ -297,6 +309,11 @@ public ContainerReplicaBuilder setChecksums(ContainerChecksums checksums) { return this; } + public ContainerReplicaBuilder setStorageType(StorageType storageType) { + this.storageType = storageType; + return this; + } + /** * Constructs new ContainerReplicaBuilder. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index c13deefff2a6..92398e8dfcbd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer; import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.eq; @@ -48,8 +49,10 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.StorageTypeUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -230,7 +233,7 @@ private void testReplicaIndexUpdate(ContainerInfo container, Map expectedReplicaMap) { final ContainerReportsProto containerReport = getContainerReportsProto( container.containerID(), ContainerReplicaProto.State.CLOSED, - dn.getUuidString(), 2000000000L, 100000000L, 10000L, replicaIndex); + dn.getUuidString(), 2000000000L, 100000000L, 10000L, replicaIndex, null); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(dn, containerReport); final ContainerReportHandler reportHandler = new ContainerReportHandler( @@ -323,7 +326,7 @@ public void testUnderReplicatedContainer() // containerOne becomes under replicated. final ContainerReportsProto containerReport = getContainerReportsProto( containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne.getUuidString()); + datanodeOne.getUuidString(), null); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); @@ -379,7 +382,7 @@ public void testOverReplicatedContainer() throws NodeNotFoundException, final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeFour.getUuidString()); + datanodeFour.getUuidString(), null); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeFour, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); @@ -440,10 +443,9 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException, containerOneReplicas.forEach(containerStateManager::updateContainerReplica); containerTwoReplicas.forEach(containerStateManager::updateContainerReplica); - final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne.getUuidString()); + datanodeOne.getUuidString(), null); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); @@ -672,7 +674,7 @@ private void createAndHandleContainerReport(ContainerID containerID, int replicaIndex) { final ContainerReportFromDatanode containerReportFromDatanode = getContainerReportFromDatanode(containerID, state, - datanodeDetails, 2000000000L, 100000L, replicaIndex); + datanodeDetails, 2000000000L, 100000L, replicaIndex, null); final ContainerReportHandler reportHandler = new ContainerReportHandler( nodeManager, containerManager); reportHandler.onMessage(containerReportFromDatanode, publisher); @@ -731,7 +733,7 @@ public void testClosingToQuasiClosed() final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, - datanodeOne.getUuidString()); + datanodeOne.getUuidString(), null); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); @@ -795,7 +797,7 @@ public void testQuasiClosedToClosed() final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne.getUuidString()); + datanodeOne.getUuidString(), null); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); @@ -835,7 +837,7 @@ public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleSt final ContainerReportsProto containerReport = getContainerReportsProto( container1.containerID(), ContainerReplicaProto.State.CLOSED, dn1.getUuidString(), - 2000L); + 2000L, StorageType.DISK); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(dn1, containerReport); @@ -880,7 +882,7 @@ public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeOne, 50L, 60L), publisher); + datanodeOne, 50L, 60L, null), publisher); // Single replica reported - ensure values are updated assertEquals(50L, containerManager.getContainer(containerOne.containerID()) @@ -890,10 +892,10 @@ public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeTwo, 50L, 60L), publisher); + datanodeTwo, 50L, 60L, null), publisher); reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeThree, 50L, 60L), publisher); + datanodeThree, 50L, 60L, null), publisher); // All 3 DNs are reporting the same values. Counts should be as expected. assertEquals(50L, containerManager.getContainer(containerOne.containerID()) @@ -905,13 +907,13 @@ public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // reported. reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeOne, 1L, 10L), publisher); + datanodeOne, 1L, 10L, null), publisher); reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeTwo, 2L, 11L), publisher); + datanodeTwo, 2L, 11L, null), publisher); reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeThree, 3L, 12L), publisher); + datanodeThree, 3L, 12L, null), publisher); // All 3 DNs are reporting different values. The actual value should be the // minimum. @@ -924,7 +926,7 @@ public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // is the minimum reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeOne, 3L, 12L), publisher); + datanodeOne, 3L, 12L, null), publisher); assertEquals(2L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); @@ -956,7 +958,7 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas() reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeOne, 50L, 60L), publisher); + datanodeOne, 50L, 60L, null), publisher); // Single replica reported - ensure values are updated assertEquals(50L, containerManager.getContainer(containerOne.containerID()) @@ -966,10 +968,10 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas() reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeTwo, 50L, 60L), publisher); + datanodeTwo, 50L, 60L, null), publisher); reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeThree, 50L, 60L), publisher); + datanodeThree, 50L, 60L, null), publisher); // All 3 DNs are reporting the same values. Counts should be as expected. assertEquals(50L, containerManager.getContainer(containerOne.containerID()) @@ -981,13 +983,13 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas() // reported. reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeOne, 1L, 10L), publisher); + datanodeOne, 1L, 10L, null), publisher); reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeTwo, 2L, 11L), publisher); + datanodeTwo, 2L, 11L, null), publisher); reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeThree, 3L, 12L), publisher); + datanodeThree, 3L, 12L, null), publisher); // All 3 DNs are reporting different values. The actual value should be the // maximum. @@ -1000,7 +1002,7 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas() // is the new maximumu reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - datanodeThree, 1L, 10L), publisher); + datanodeThree, 1L, 10L, null), publisher); assertEquals(2L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); @@ -1034,11 +1036,10 @@ public void openECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() .getUsedBytes()); assertEquals(0L, containerManager.getContainer(containerOne.containerID()) .getNumberOfKeys()); - // Report from data index 2 - should not update stats reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(2), 50L, 60L, 2), publisher); + dns.get(2), 50L, 60L, 2, null), publisher); assertEquals(0L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(0L, containerManager.getContainer(containerOne.containerID()) @@ -1047,7 +1048,7 @@ public void openECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // Report from replica 1, it should update reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(1), 50L, 60L, 1), publisher); + dns.get(1), 50L, 60L, 1, null), publisher); assertEquals(50L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(60L, containerManager.getContainer(containerOne.containerID()) @@ -1058,7 +1059,7 @@ public void openECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // Report from replica 1, it should update reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(4), 80L, 90L, 4), publisher); + dns.get(4), 80L, 90L, 4, null), publisher); assertEquals(50L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(60L, containerManager.getContainer(containerOne.containerID()) @@ -1067,7 +1068,7 @@ public void openECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // Parity 2 reports a lesser value, so the stored values should update reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(5), 40, 30, 5), publisher); + dns.get(5), 40, 30, 5, null), publisher); assertEquals(40L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(30L, containerManager.getContainer(containerOne.containerID()) @@ -1077,7 +1078,7 @@ public void openECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // lesser values reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(2), 10L, 10L, 2), publisher); + dns.get(2), 10L, 10L, 2, null), publisher); assertEquals(40L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(30L, containerManager.getContainer(containerOne.containerID()) @@ -1114,7 +1115,7 @@ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // Report from data index 2 - should not update stats reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(2), 50L, 60L, 2), publisher); + dns.get(2), 50L, 60L, 2, null), publisher); assertEquals(0L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(0L, containerManager.getContainer(containerOne.containerID()) @@ -1123,7 +1124,7 @@ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // Report from replica 1, it should update reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(1), 50L, 60L, 1), publisher); + dns.get(1), 50L, 60L, 1, null), publisher); assertEquals(50L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(60L, containerManager.getContainer(containerOne.containerID()) @@ -1134,7 +1135,7 @@ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // Report from replica 1, it should update reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(4), 80L, 90L, 4), publisher); + dns.get(4), 80L, 90L, 4, null), publisher); assertEquals(80L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(90L, containerManager.getContainer(containerOne.containerID()) @@ -1143,7 +1144,7 @@ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // Parity 2 reports a lesser value, so the stored values should not update reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(5), 40, 30, 5), publisher); + dns.get(5), 40, 30, 5, null), publisher); assertEquals(80L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(90L, containerManager.getContainer(containerOne.containerID()) @@ -1153,7 +1154,7 @@ public void closedECContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() // greater values reportHandler.onMessage(getContainerReportFromDatanode( containerOne.containerID(), replicaState, - dns.get(2), 110L, 120L, 2), publisher); + dns.get(2), 110L, 120L, 2, null), publisher); assertEquals(80L, containerManager.getContainer(containerOne.containerID()) .getUsedBytes()); assertEquals(90L, containerManager.getContainer(containerOne.containerID()) @@ -1178,7 +1179,7 @@ public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException, I final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne.getUuidString(), 0, true); + datanodeOne.getUuidString(), 0, true, null); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); @@ -1207,14 +1208,15 @@ public void testDeletedContainerWithStaleQuasiClosedReplicaDoesNotResurrect() // Report non-empty QUASI_CLOSED replica with matching bcsId final ContainerReportsProto containerReport = getContainerReportsProto( - containerOne.containerID(), + containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, - datanodeOne.getUuidString(), + datanodeOne.getUuidString(), 200L, // usedBytes 10L, // keyCount (non-empty) 10000L, // bcsId (matches container) 0, // replicaIndex - false); // isEmpty=false + false, // isEmpty=false + StorageType.DISK); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); @@ -1224,7 +1226,7 @@ public void testDeletedContainerWithStaleQuasiClosedReplicaDoesNotResurrect() final ContainerInfo container = containerManager.getContainer(containerOne.containerID()); assertEquals(LifeCycleState.DELETED, container.getState(), "Container should not resurrect when a stale QUASI_CLOSED replica is reported"); - + // A delete command should be sent for the stale replica verify(publisher, times(1)).fireEvent(eq(SCMEvents.DATANODE_COMMAND), any(CommandForDatanode.class)); } @@ -1234,7 +1236,7 @@ public void testDeletedContainerWithStaleQuasiClosedReplicaDoesNotResurrect() * OPEN replicas should trigger resurrection to QUASI_CLOSED state. */ @Test - public void testDeletingContainerResurrectionToQuasiClosedWithOpenReplica() + public void testDeletingContainerResurrectionToQuasiClosedWithOpenReplica() throws NodeNotFoundException, IOException { final ContainerReportHandler reportHandler = new ContainerReportHandler(nodeManager, containerManager); final DatanodeDetails datanodeOne = nodeManager.getNodes( @@ -1248,14 +1250,15 @@ public void testDeletingContainerResurrectionToQuasiClosedWithOpenReplica() // Report non-empty OPEN replica (e.g., stale DN that came back online) final ContainerReportsProto containerReport = getContainerReportsProto( - containerOne.containerID(), + containerOne.containerID(), ContainerReplicaProto.State.OPEN, datanodeOne.getUuidString(), 200L, // usedBytes 10L, // keyCount (non-empty) 10000L, // bcsId 0, // replicaIndex - false); // isEmpty=false + false, // isEmpty=false + StorageType.DISK); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); @@ -1265,7 +1268,7 @@ public void testDeletingContainerResurrectionToQuasiClosedWithOpenReplica() final ContainerInfo resurrectedContainer = containerManager.getContainer(containerOne.containerID()); assertEquals(LifeCycleState.QUASI_CLOSED, resurrectedContainer.getState(), "Container should resurrect to QUASI_CLOSED when OPEN replica is reported"); - + // Replica should be updated in SCM assertEquals(1, containerManager.getContainerReplicas(containerOne.containerID()).size()); } @@ -1305,7 +1308,7 @@ public void testWithNoContainerDataChecksum() throws Exception { int numReportsSent = 0; for (DatanodeDetails dn: datanodes) { final ContainerReportsProto dnReportProto = getContainerReportsProto( - contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString(), StorageType.DISK); final ContainerReportFromDatanode dnReport = new ContainerReportFromDatanode(dn, dnReportProto); reportHandler.onMessage(dnReport, publisher); numReportsSent++; @@ -1353,7 +1356,7 @@ public void testWithContainerDataChecksum() throws Exception { // For each datanode, send a container report with a mismatched checksum. for (DatanodeDetails dn: datanodes) { ContainerReportsProto dnReportProto = getContainerReportsProto( - contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString(), StorageType.DISK); ContainerReplicaProto replicaWithChecksum = dnReportProto.getReports(0).toBuilder() .setDataChecksum(createUniqueDataChecksumForReplica(contID, dn.getUuidString())) .build(); @@ -1380,7 +1383,7 @@ public void testWithContainerDataChecksum() throws Exception { // This simulates reconciliation running. for (DatanodeDetails dn: datanodes) { ContainerReportsProto dnReportProto = getContainerReportsProto( - contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString()); + contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString(), StorageType.DISK); ContainerReplicaProto replicaWithChecksum = dnReportProto.getReports(0).toBuilder() .setDataChecksum(createMatchingDataChecksumForReplica(contID)) .build(); @@ -1417,74 +1420,138 @@ protected static long createMatchingDataChecksumForReplica(ContainerID container return Objects.hashCode(containerID); } + @Test + public void testReplicaStorageTypeValidation() throws IOException { + // Prepare env + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, containerManager); + final Iterator nodeIterator = nodeManager.getNodes( + NodeStatus.inServiceHealthy()).iterator(); + Map storageTypeToDn = new HashMap<>(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerReplicaProto.State replicaState + = ContainerReplicaProto.State.CLOSED; + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); + + // Add Container Report + containerStateManager.addContainer(containerOne.getProtobuf()); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeOne, 50L, 60L, StorageType.DISK), publisher); + storageTypeToDn.put(StorageType.DISK, datanodeOne); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeTwo, 50L, 60L, StorageType.SSD), publisher); + storageTypeToDn.put(StorageType.SSD, datanodeTwo); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeThree, 50L, 60L, StorageType.ARCHIVE), publisher); + storageTypeToDn.put(StorageType.ARCHIVE, datanodeThree); + + // Assert the StorageType is valid + assertEquals(3, containerManager.getContainerReplicas( + containerOne.containerID()).size()); + for (ContainerReplica containerReplica : containerManager.getContainerReplicas( + containerOne.containerID())) { + assertEquals(storageTypeToDn.get(containerReplica.getStorageType()), + containerReplica.getDatanodeDetails()); + } + + storageTypeToDn.clear(); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeOne, 50L, 60L, StorageType.SSD), publisher); + storageTypeToDn.put(StorageType.SSD, datanodeOne); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeTwo, 50L, 60L, StorageType.ARCHIVE), publisher); + storageTypeToDn.put(StorageType.ARCHIVE, datanodeTwo); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeThree, 50L, 60L, null), publisher); + for (ContainerReplica containerReplica : containerManager.getContainerReplicas( + containerOne.containerID())) { + if (containerReplica.getDatanodeDetails().equals(datanodeThree)) { + assertNull(containerReplica.getStorageType()); + } else { + assertEquals(storageTypeToDn.get(containerReplica.getStorageType()), + containerReplica.getDatanodeDetails()); + } + } + } + private ContainerReportFromDatanode getContainerReportFromDatanode( ContainerID containerId, ContainerReplicaProto.State state, - DatanodeDetails dn, long bytesUsed, long keyCount) { + DatanodeDetails dn, long bytesUsed, long keyCount, StorageType storageType) { return getContainerReportFromDatanode(containerId, state, dn, bytesUsed, - keyCount, 0); + keyCount, 0, storageType); } private ContainerReportFromDatanode getContainerReportFromDatanode( ContainerID containerId, ContainerReplicaProto.State state, - DatanodeDetails dn, long bytesUsed, long keyCount, int replicaIndex) { + DatanodeDetails dn, long bytesUsed, long keyCount, int replicaIndex, StorageType storageType) { ContainerReportsProto containerReport = getContainerReportsProto( containerId, state, dn.getUuidString(), bytesUsed, keyCount, - 10000L, replicaIndex); + 10000L, replicaIndex, storageType); return new ContainerReportFromDatanode(dn, containerReport); } protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, - final String originNodeId) { + final String originNodeId, StorageType storageType) { return getContainerReportsProto(containerId, state, originNodeId, - 2000000000L, 100000000L, 10000L, 0); + 2000000000L, 100000000L, 10000L, 0, storageType); } protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, - final String originNodeId, final long bcsId) { + final String originNodeId, final long bcsId, StorageType storageType) { return getContainerReportsProto(containerId, state, originNodeId, - 2000000000L, 100000000L, bcsId, 0); + 2000000000L, 100000000L, bcsId, 0, storageType); } protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, - final String originNodeId, int replicaIndex) { + final String originNodeId, int replicaIndex, StorageType storageType) { return getContainerReportsProto(containerId, state, originNodeId, - 2000000000L, 100000000L, 10000L, replicaIndex, false); + 2000000000L, 100000000L, 10000L, replicaIndex, false, storageType); } protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, - final String originNodeId, int replicaIndex, boolean isEmpty) { + final String originNodeId, int replicaIndex, boolean isEmpty, StorageType storageType) { return getContainerReportsProto(containerId, state, originNodeId, - 2000000000L, 100000000L, 10000L, replicaIndex, isEmpty); + 2000000000L, 100000000L, 10000L, replicaIndex, isEmpty, storageType); } protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, - final String originNodeId, final long bcsId, int replicaIndex) { + final String originNodeId, final long bcsId, int replicaIndex, StorageType storageType) { return getContainerReportsProto(containerId, state, originNodeId, - 2000000000L, 100000000L, bcsId, replicaIndex); + 2000000000L, 100000000L, bcsId, replicaIndex, storageType); } + @SuppressWarnings("checkstyle:ParameterNumber") protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, final String originNodeId, final long usedBytes, final long keyCount, - final long bcsId, final int replicaIndex) { + final long bcsId, final int replicaIndex, StorageType storageType) { return getContainerReportsProto(containerId, state, originNodeId, usedBytes, - keyCount, bcsId, replicaIndex, false); + keyCount, bcsId, replicaIndex, false, storageType); } @SuppressWarnings("checkstyle:ParameterNumber") protected static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, final String originNodeId, final long usedBytes, final long keyCount, - final long bcsId, final int replicaIndex, final boolean isEmpty) { + final long bcsId, final int replicaIndex, final boolean isEmpty, + final StorageType storageType) { final ContainerReportsProto.Builder crBuilder = ContainerReportsProto.newBuilder(); - final ContainerReplicaProto replicaProto = + final ContainerReplicaProto.Builder replicaProto = ContainerReplicaProto.newBuilder() .setContainerID(containerId.getId()) .setState(state) @@ -1499,8 +1566,10 @@ protected static ContainerReportsProto getContainerReportsProto( .setBlockCommitSequenceId(bcsId) .setDeleteTransactionId(0) .setReplicaIndex(replicaIndex) - .setIsEmpty(isEmpty) - .build(); + .setIsEmpty(isEmpty); + if (storageType != null) { + replicaProto.setStorageType(StorageTypeUtils.getStorageTypeProto(storageType)); + } return crBuilder.addReports(replicaProto).build(); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java index 8e87cc5d88d9..9bb1491c090f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -43,6 +43,7 @@ import java.time.Clock; import java.time.ZoneId; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,9 +54,11 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.StorageTypeUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -284,7 +287,7 @@ private void createAndHandleICR(ContainerID containerID, int replicaIndex) { final IncrementalContainerReportProto containerReport = getIncrementalContainerReportProto(containerID, state, - datanodeDetails.getUuidString(), true, replicaIndex); + datanodeDetails.getUuidString(), true, replicaIndex, null); final IncrementalContainerReportFromDatanode icrFromDatanode = new IncrementalContainerReportFromDatanode(datanodeDetails, containerReport); @@ -439,7 +442,7 @@ public void testContainerStateTransitionToClosedWithMismatchingBCSID(LifeCycleSt final IncrementalContainerReportProto containerReport = getIncrementalContainerReportProto(container.containerID(), CLOSED, datanodeThree.getUuidString(), false, 0, - 2000L); + 2000L, StorageType.DISK); final IncrementalContainerReportFromDatanode icr = new IncrementalContainerReportFromDatanode( datanodeOne, containerReport); @@ -571,7 +574,7 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException, datanode, containerReport); final ContainerReportsProto fullReport = getContainerReportsProto( - containerTwo.containerID(), CLOSED, datanode.getUuidString()); + containerTwo.containerID(), CLOSED, datanode.getUuidString(), null); final ContainerReportFromDatanode fcr = new ContainerReportFromDatanode( datanode, fullReport); @@ -752,6 +755,59 @@ public void testWithContainerDataChecksum() throws Exception { assertEquals(numNodes, numReplicasChecked); } + @Test + public void testReplicaStorageTypeValidation() throws IOException { + // Prepare env + Map storageTypeToDn = new HashMap<>(); + final IncrementalContainerReportHandler reportHandler = + new IncrementalContainerReportHandler( + nodeManager, containerManager, scmContext); + final ContainerInfo container = getContainer(LifeCycleState.CLOSING); + final DatanodeDetails datanodeOne = randomDatanodeDetails(); + final DatanodeDetails datanodeTwo = randomDatanodeDetails(); + final DatanodeDetails datanodeThree = randomDatanodeDetails(); + nodeManager.register(datanodeOne, null, null); + nodeManager.register(datanodeTwo, null, null); + nodeManager.register(datanodeThree, null, null); + final Set containerReplicas = getReplicas( + container.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeOne, datanodeTwo, datanodeThree); + containerStateManager.addContainer(container.getProtobuf()); + containerReplicas.forEach(replica -> { + containerStateManager.updateContainerReplica(replica); + }); + + // Add Container Report + addIncrContainerReport(container, datanodeOne, reportHandler, StorageType.DISK); + storageTypeToDn.put(StorageType.DISK, datanodeOne); + addIncrContainerReport(container, datanodeTwo, reportHandler, StorageType.SSD); + storageTypeToDn.put(StorageType.SSD, datanodeTwo); + addIncrContainerReport(container, datanodeThree, reportHandler, StorageType.RAM_DISK); + storageTypeToDn.put(StorageType.RAM_DISK, datanodeThree); + + // Assert the StorageType is valid + assertEquals(3, containerStateManager + .getContainerReplicas(container.containerID()).size()); + for (ContainerReplica containerReplica : containerStateManager + .getContainerReplicas(container.containerID())) { + assertEquals(storageTypeToDn.get(containerReplica.getStorageType()), + containerReplica.getDatanodeDetails()); + } + } + + private void addIncrContainerReport(ContainerInfo container, DatanodeDetails datanode, + IncrementalContainerReportHandler reportHandler, StorageType storageType) { + final IncrementalContainerReportProto containerReport1 = + getIncrementalContainerReportProto(container.containerID(), + ContainerReplicaProto.State.CLOSED, + datanode.getUuidString(), true, 0, storageType); + final IncrementalContainerReportFromDatanode icrFromDatanode1 = + new IncrementalContainerReportFromDatanode( + datanode, containerReport1); + reportHandler.onMessage(icrFromDatanode1, publisher); + } + private static IncrementalContainerReportProto getIncrementalContainerReportProto(ContainerReplicaProto replicaProto) { final IncrementalContainerReportProto.Builder crBuilder = @@ -765,9 +821,10 @@ public void testWithContainerDataChecksum() throws Exception { final ContainerReplicaProto.State state, final String originNodeId, final boolean hasReplicaIndex, - final int replicaIndex) { + final int replicaIndex, + final StorageType storageType) { return getIncrementalContainerReportProto(containerId, state, originNodeId, - hasReplicaIndex, replicaIndex, 10000L); + hasReplicaIndex, replicaIndex, 10000L, storageType); } private static IncrementalContainerReportProto @@ -777,7 +834,8 @@ public void testWithContainerDataChecksum() throws Exception { final String originNodeId, final boolean hasReplicaIndex, final int replicaIndex, - final long bcsId) { + final long bcsId, + final StorageType storageType) { final ContainerReplicaProto.Builder replicaProto = ContainerReplicaProto.newBuilder() .setContainerID(containerId.getId()) @@ -795,6 +853,9 @@ public void testWithContainerDataChecksum() throws Exception { if (hasReplicaIndex) { replicaProto.setReplicaIndex(replicaIndex); } + if (storageType != null) { + replicaProto.setStorageType(StorageTypeUtils.getStorageTypeProto(storageType)); + } return getIncrementalContainerReportProto(replicaProto.build()); } @@ -804,16 +865,16 @@ public void testWithContainerDataChecksum() throws Exception { final ContainerReplicaProto.State state, final String originNodeId) { return getIncrementalContainerReportProto(containerId, state, originNodeId, - false, 0); + false, 0, null); } private void testReplicaIndexUpdate(ContainerInfo container, DatanodeDetails dn, int replicaIndex, - Map expectedReplicaMap) { + Map expectedReplicaMap, StorageType storageType) { final IncrementalContainerReportProto containerReport = getIncrementalContainerReportProto(container.containerID(), ContainerReplicaProto.State.CLOSED, dn.getUuidString(), - true, replicaIndex); + true, replicaIndex, storageType); final IncrementalContainerReportFromDatanode containerReportFromDatanode = new IncrementalContainerReportFromDatanode(dn, containerReport); final IncrementalContainerReportHandler reportHandler = @@ -852,9 +913,9 @@ public void testECReplicaIndexValidation() throws NodeNotFoundException, .collect(Collectors.toMap(ContainerReplica::getDatanodeDetails, ContainerReplica::getReplicaIndex)); replicas.forEach(containerStateManager::updateContainerReplica); - testReplicaIndexUpdate(container, dns.get(0), 0, replicaMap); - testReplicaIndexUpdate(container, dns.get(0), 6, replicaMap); + testReplicaIndexUpdate(container, dns.get(0), 0, replicaMap, null); + testReplicaIndexUpdate(container, dns.get(0), 6, replicaMap, null); replicaMap.put(dns.get(0), 2); - testReplicaIndexUpdate(container, dns.get(0), 2, replicaMap); + testReplicaIndexUpdate(container, dns.get(0), 2, replicaMap, null); } }