From 28a2e6190c61c2c5686f081c2a2d315d80852bf4 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Sun, 24 May 2026 01:18:15 -0700 Subject: [PATCH 1/7] HDDS-10283. Apply zero copy to Ozone client reads. Install a ZeroCopyMessageMarshaller (from ratis-thirdparty) as the response marshaller for the XceiverClient.send bidi-stream RPC. The marshaller wraps the inbound Netty buffer as proto3 ByteString via UnsafeByteOperations.unsafeWrap, so the chunk-data field of a ReadChunk response no longer requires a chunk-sized memcpy on the client. Lifecycle: - ReadChunk responses keep the Netty buffer alive: ContainerProtocolCalls returns the outer ContainerCommandResponseProto so that ChunkInputStream can hold the reference until releaseBuffers() runs, at which point it calls XceiverClientSpi.releaseReceivedResponse to return the buffer to the pool. - All other response types are deep-copied inside the wrapping marshaller and the original buffer-aliased response is released immediately, so callers never observe an aliased non-ReadChunk response and the marshaller's tracking map does not grow without bound. - If a sendCommand validator throws, the response is released in a finally block before the exception propagates. Mirrors the existing server-side request-side zero-copy pattern in GrpcXceiverService.bindServiceWithZeroCopy. --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 98 ++++++++++++++++++- .../hdds/scm/storage/ChunkInputStream.java | 37 ++++++- .../hadoop/hdds/scm/XceiverClientSpi.java | 30 +++++- .../scm/storage/ContainerProtocolCalls.java | 19 +++- ...tainerReconciliationWithMockDatanodes.java | 9 +- .../scm/storage/TestContainerCommandsEC.java | 22 +++-- 6 files changed, 195 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 8b1a99a6b668..fd8e8821bd02 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.io.InputStream; import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -63,8 +64,15 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.util.Time; +import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat; +import org.apache.ratis.thirdparty.io.grpc.CallOptions; +import org.apache.ratis.thirdparty.io.grpc.Channel; +import org.apache.ratis.thirdparty.io.grpc.ClientCall; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; +import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; @@ -91,6 +99,82 @@ public class XceiverClientGrpc extends XceiverClientSpi { private static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class); private static final int SHUTDOWN_WAIT_INTERVAL_MILLIS = 100; private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5; + + /** + * Zero-copy marshaller for inbound {@link ContainerCommandResponseProto}. + * For {@code ReadChunk} responses, the parsed proto's bytes fields alias + * the inbound Netty buffer (avoiding a chunk-sized memcpy); the buffer is + * held until {@link ChunkInputStream} releases it via + * {@link #releaseReceivedResponse}. Other response types are deep-copied + * inside the wrapping marshaller and released immediately, so callers + * never observe a buffer-aliased non-{@code ReadChunk} response. + */ + private static final ZeroCopyMessageMarshaller + ZERO_COPY_RESPONSE_MARSHALLER = new ZeroCopyMessageMarshaller<>( + ContainerCommandResponseProto.getDefaultInstance()); + + /** + * Marshaller used as the response marshaller of the {@code send} method. + * Delegates outbound {@code stream()} to the default, and inbound + * {@code parse()} to the zero-copy marshaller. After a successful + * zero-copy parse, non-{@code ReadChunk} responses are deep-copied and + * the original buffer-aliased response is released, so only + * {@code ReadChunk} responses retain the Netty buffer until the caller + * is done with the chunk bytes. + */ + private static final MethodDescriptor.Marshaller + RESPONSE_MARSHALLER = + new MethodDescriptor.Marshaller() { + @Override + public InputStream stream(ContainerCommandResponseProto value) { + return ZERO_COPY_RESPONSE_MARSHALLER.stream(value); + } + + @Override + public ContainerCommandResponseProto parse(InputStream stream) { + ContainerCommandResponseProto resp = + ZERO_COPY_RESPONSE_MARSHALLER.parse(stream); + if (resp == null + || resp.getCmdType() == ContainerProtos.Type.ReadChunk) { + return resp; + } + // Materialize all bytes fields into byte[]-backed ByteStrings so + // the response is independent of the Netty inbound buffer, then + // release the underlying buffer back to the pool. + try { + return ContainerCommandResponseProto.parseFrom(resp.toByteArray()); + } catch (InvalidProtocolBufferException e) { + throw Status.INTERNAL.withDescription("Failed to materialize response") + .withCause(e).asRuntimeException(); + } finally { + ZERO_COPY_RESPONSE_MARSHALLER.release(resp); + } + } + }; + + /** + * gRPC interceptor that installs {@link #RESPONSE_MARSHALLER} as the + * response marshaller for the {@code send} bidi-streaming method. + */ + private static final ClientInterceptor ZERO_COPY_INTERCEPTOR = + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, + CallOptions callOptions, Channel next) { + if (XceiverClientProtocolServiceGrpc.getSendMethod() + .getFullMethodName().equals(method.getFullMethodName())) { + @SuppressWarnings("unchecked") + final MethodDescriptor.Marshaller respMarshaller = + (MethodDescriptor.Marshaller) RESPONSE_MARSHALLER; + method = method.toBuilder() + .setResponseMarshaller(respMarshaller) + .build(); + } + return next.newCall(method, callOptions); + } + }; + private final Pipeline pipeline; private final ConfigurationSource config; private final XceiverClientMetrics metrics; @@ -210,7 +294,10 @@ protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dnHost, port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .proxyDetector(uri -> null) - .intercept(new GrpcClientInterceptor()); + // Order matters. The zero-copy interceptor swaps the response + // marshaller for `send`; the tracing interceptor wraps that. + .intercept(new GrpcClientInterceptor()) + .intercept(ZERO_COPY_INTERCEPTOR); if (secConfig.isSecurityEnabled() && secConfig.isGrpcTlsEnabled()) { SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient(); if (trustManager != null) { @@ -766,6 +853,15 @@ public void setTimeout(long timeout) { this.timeout = timeout; } + @Override + public void releaseReceivedResponse(ContainerCommandResponseProto response) { + if (response != null) { + // Idempotent: the marshaller no-ops if `response` was never tracked + // (i.e., parsed via the standard fallback path). + ZERO_COPY_RESPONSE_MARSHALLER.release(response); + } + } + /** * Group the channel and stub so that they are published together. */ diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index 22917ce4b6c7..5d1b929843d5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -96,6 +96,13 @@ public class ChunkInputStream extends InputStream // retry. Once the chunk is read, this variable is reset. private long chunkPosition = -1; + // Outer ContainerCommandResponseProto whose chunk data the current `buffers` + // view. Held until releaseBuffers() so the underlying zero-copy-tracked + // Netty buffer is not freed while the buffers are still in use. + // Null on the standard (non-zero-copy) parse path - the proto's bytes + // are heap-backed and need no explicit release. + private ContainerCommandResponseProto lastResponseToRelease; + private final Supplier> tokenSupplier; private static final int EOF = -1; @@ -431,14 +438,25 @@ private void readChunkDataIntoBuffers(ChunkInfo readChunkInfo) /** * Send RPC call to get the chunk from the container. + *

+ * On the zero-copy path (HDDS-10283), the returned {@link ByteBuffer}s + * view the underlying inbound Netty buffer of the response. Those bytes + * stay reachable until {@link #releaseBuffers()} releases them via + * {@link XceiverClientSpi#releaseReceivedResponse}, which this method + * registers as a side effect through {@link #lastResponseToRelease}. */ @VisibleForTesting protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) throws IOException { - ReadChunkResponseProto readChunkResponse = + ContainerCommandResponseProto reply = ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators, tokenSupplier.get()); + // The zero-copy-tracked buffer for the chunk data lives inside `reply`. + // Stash the outer reference so releaseBuffers() can return it once the + // chunk bytes have been fully consumed. + setLastResponseToRelease(reply); + final ReadChunkResponseProto readChunkResponse = reply.getReadChunk(); if (readChunkResponse.hasData()) { return readChunkResponse.getData().asReadOnlyByteBufferList() @@ -453,6 +471,16 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) } } + private void setLastResponseToRelease(ContainerCommandResponseProto reply) { + // If a previous response is still being held (e.g., readChunk called + // twice without an intervening releaseBuffers), release the prior one + // first so its Netty buffer goes back to the pool. + if (lastResponseToRelease != null) { + xceiverClient.releaseReceivedResponse(lastResponseToRelease); + } + lastResponseToRelease = reply; + } + private void validateChunk( ContainerCommandRequestProto request, ContainerCommandResponseProto response @@ -702,6 +730,13 @@ private void releaseBuffers() { buffers = null; bufferIndex = 0; firstUnreleasedBufferIndex = 0; + // Release the zero-copy-tracked Netty buffer of the response that backed + // these ByteBuffers. Idempotent and a no-op if the response was parsed + // via the standard heap-backed path. + if (lastResponseToRelease != null) { + xceiverClient.releaseReceivedResponse(lastResponseToRelease); + lastResponseToRelease = null; + } // We should not reset bufferOffsetWrtChunkData and buffersSize here // because when getPos() is called in chunkStreamEOF() we use these // values and determine whether chunk is read completely or not. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index 54be3c5686a0..daf64490c760 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -129,12 +129,15 @@ public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request, List validators) throws IOException { + ContainerCommandResponseProto responseProto = null; + boolean validatorsPassed = false; try { XceiverClientReply reply = sendCommandAsync(request); - ContainerCommandResponseProto responseProto = reply.getResponse().get(); + responseProto = reply.getResponse().get(); for (Validator function : validators) { function.accept(request, responseProto); } + validatorsPassed = true; return responseProto; } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException @@ -142,6 +145,13 @@ public ContainerCommandResponseProto sendCommand( throw getIOExceptionForSendCommand(request, e); } catch (ExecutionException e) { throw getIOExceptionForSendCommand(request, e); + } finally { + // If a validator threw, the caller never receives the response, so + // release any zero-copy-tracked buffer here. Successful responses are + // returned to the caller, who is responsible for releasing them. + if (responseProto != null && !validatorsPassed) { + releaseReceivedResponse(responseProto); + } } } @@ -204,4 +214,22 @@ public CompletableFuture watchForCommit(long index) { public abstract Map sendCommandOnAllNodes(ContainerCommandRequestProto request) throws IOException, InterruptedException; + + /** + * Release the resources held on behalf of a previously-received response. + *

+ * When the underlying transport parses a response with a zero-copy + * marshaller, the parsed proto's {@code bytes} fields reference the + * Netty-managed pooled buffer of the inbound message. Those buffers must + * be released back to Netty when the caller is done with the proto; + * otherwise direct memory accumulates. Callers of {@code sendCommand} + * that retain the response past the call (e.g. {@code ReadChunk} via + * {@code ChunkInputStream}) must invoke this method once they are done. + *

+ * This method is idempotent and safe to call on responses that were + * never tracked by a zero-copy marshaller. + */ + public void releaseReceivedResponse(ContainerCommandResponseProto response) { + // Default: transport without zero-copy support -> no-op. + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 15879fb47649..2e7077d5e406 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -344,17 +344,24 @@ public static ContainerCommandRequestProto getPutBlockRequest( } /** - * Calls the container protocol to read a chunk. + * Calls the container protocol to read a chunk and returns the outer + * {@link ContainerCommandResponseProto}. The caller is responsible for + * invoking + * {@link XceiverClientSpi#releaseReceivedResponse(ContainerCommandResponseProto)} + * on the returned response once the chunk bytes are no longer needed. + * The outer wrapper is returned (rather than just the inner + * {@link ReadChunkResponseProto}) so that the caller has a stable reference + * for the zero-copy release. * * @param xceiverClient client to perform call * @param chunk information about chunk to read * @param blockID ID of the block * @param validators functions to validate the response * @param token a token for this block (may be null) - * @return container protocol read chunk response + * @return outer container command response containing the read chunk reply * @throws IOException if there is an I/O error while performing the call */ - public static ContainerProtos.ReadChunkResponseProto readChunk( + public static ContainerCommandResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, Token token) throws IOException { @@ -383,7 +390,7 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( } } - private static ContainerProtos.ReadChunkResponseProto readChunk( + private static ContainerCommandResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, ContainerCommandRequestProto.Builder builder, @@ -399,10 +406,12 @@ private static ContainerProtos.ReadChunkResponseProto readChunk( final ReadChunkResponseProto response = reply.getReadChunk(); final long readLen = getLen(response); if (readLen != chunk.getLen()) { + // Release the zero-copy-tracked buffer before propagating the error. + xceiverClient.releaseReceivedResponse(reply); throw new IOException(toErrorMessage(chunk, blockID, d) + ": readLen=" + readLen); } - return response; + return reply; } static String toErrorMessage(ChunkInfo chunk, DatanodeBlockID blockId, diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java index c051b3478b4c..cf38520293be 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java @@ -510,7 +510,7 @@ public ContainerProtos.GetBlockResponseProto getBlock(BlockID blockID) throws IO .build(); } - public ContainerProtos.ReadChunkResponseProto readChunk(ContainerProtos.DatanodeBlockID blockId, + public ContainerProtos.ContainerCommandResponseProto readChunk(ContainerProtos.DatanodeBlockID blockId, ContainerProtos.ChunkInfo chunkInfo, List validators) throws IOException { KeyValueContainer container = getContainer(blockId.getContainerID()); ContainerProtos.ReadChunkResponseProto readChunkResponseProto = @@ -520,11 +520,11 @@ public ContainerProtos.ReadChunkResponseProto readChunk(ContainerProtos.Datanode .setData(handler.getChunkManager().readChunk(container, BlockID.getFromProtobuf(blockId), ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString()) .build(); - verifyChecksums(readChunkResponseProto, blockId, chunkInfo, validators); - return readChunkResponseProto; + return verifyChecksums(readChunkResponseProto, blockId, chunkInfo, validators); } - public void verifyChecksums(ContainerProtos.ReadChunkResponseProto readChunkResponseProto, + public ContainerProtos.ContainerCommandResponseProto verifyChecksums( + ContainerProtos.ReadChunkResponseProto readChunkResponseProto, ContainerProtos.DatanodeBlockID blockId, ContainerProtos.ChunkInfo chunkInfo, List validators) throws IOException { assertFalse(validators.isEmpty()); @@ -547,6 +547,7 @@ public void verifyChecksums(ContainerProtos.ReadChunkResponseProto readChunkResp for (XceiverClientSpi.Validator function : validators) { function.accept(requestProto, responseProto); } + return responseProto; } public KeyValueContainer getContainer(long containerID) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 7ce4f9319db2..d3f2bd2d2cee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -514,18 +514,24 @@ public void testCreateRecoveryContainer() throws Exception { container.containerID().getProtobuf().getId(), encodedToken); assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, readContainerResponseProto.getContainerData().getState()); - ContainerProtos.ReadChunkResponseProto readChunkResponseProto = + ContainerProtos.ContainerCommandResponseProto readChunkReply = ContainerProtocolCalls.readChunk(dnClient, writeChunkRequest.getWriteChunk().getChunkData(), blockID.getDatanodeBlockIDProtobufBuilder().setReplicaIndex(replicaIndex).build(), null, blockToken); - ByteBuffer[] readOnlyByteBuffersArray = BufferUtils - .getReadOnlyByteBuffersArray( - readChunkResponseProto.getDataBuffers().getBuffersList()); - assertEquals(readOnlyByteBuffersArray[0].limit(), data.length); - byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()]; - readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length); - assertArrayEquals(data, readBuff); + try { + ContainerProtos.ReadChunkResponseProto readChunkResponseProto = + readChunkReply.getReadChunk(); + ByteBuffer[] readOnlyByteBuffersArray = BufferUtils + .getReadOnlyByteBuffersArray( + readChunkResponseProto.getDataBuffers().getBuffersList()); + assertEquals(readOnlyByteBuffersArray[0].limit(), data.length); + byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()]; + readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length); + assertArrayEquals(data, readBuff); + } finally { + dnClient.releaseReceivedResponse(readChunkReply); + } } finally { xceiverClientManager.releaseClient(dnClient, false); } From 5832f5a42f538af4fc23f917d4801fab6d9e40fc Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Tue, 26 May 2026 18:40:28 -0700 Subject: [PATCH 2/7] HDDS-10283. Release zero-copy read responses on retry paths --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 5 +- .../ozone/freon/DatanodeChunkValidator.java | 30 ++++--- .../hdds/scm/TestXceiverClientGrpc.java | 83 +++++++++++++++++++ 3 files changed, 105 insertions(+), 13 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index fd8e8821bd02..53342c78becd 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -608,8 +608,11 @@ private XceiverClientReply sendCommandWithRetry( } break; } catch (IOException e) { + if (responseProto != null) { + releaseReceivedResponse(responseProto); + responseProto = null; + } ioException = e; - responseProto = null; if (LOG.isDebugEnabled()) { LOG.debug("Failed to execute command {} on datanode {}", processForDebug(request), dn, e); diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java index b17a6b54fbfd..f4b7c25af197 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java @@ -126,9 +126,12 @@ private void readReference() throws IOException { ContainerCommandRequestProto request = createReadChunkRequest(0); ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - - checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize); - checksumReference = computeChecksum(response); + try { + checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize); + checksumReference = computeChecksum(response); + } finally { + xceiverClient.releaseReceivedResponse(response); + } } private void validateChunk(long stepNo) throws Exception { @@ -138,15 +141,18 @@ private void validateChunk(long stepNo) throws Exception { try { ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - - ChecksumData checksumOfChunk = computeChecksum(response); - - if (!checksumReference.equals(checksumOfChunk)) { - throw new IllegalStateException( - "Reference (=first) message checksum doesn't match " + - "with checksum of chunk " - + response.getReadChunk() - .getChunkData().getChunkName()); + try { + ChecksumData checksumOfChunk = computeChecksum(response); + + if (!checksumReference.equals(checksumOfChunk)) { + throw new IllegalStateException( + "Reference (=first) message checksum doesn't match " + + "with checksum of chunk " + + response.getReadChunk() + .getChunkData().getChunkName()); + } + } finally { + xceiverClient.releaseReceivedResponse(response); } } catch (IOException e) { LOG.warn("Could not read chunk due to IOException: ", e); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 0f3af071fc54..06cf526e9f5e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -25,10 +25,12 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.client.RatisReplicationConfig; @@ -43,6 +45,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -150,6 +153,58 @@ public XceiverClientReply sendCommandAsync( assertEquals(0, allDNs.size()); } + @Test + public void testReadChunkValidatorFailureReleasesResponseBeforeRetry() + throws IOException { + final ArrayList allDNs = new ArrayList<>(dns); + final AtomicInteger releaseCount = new AtomicInteger(); + final BlockID blockID = new BlockID(1, 1); + final ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo + .newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build(); + final ContainerProtos.ContainerCommandRequestProto request = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(pipeline.getFirstNode().getUuidString()) + .setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunkInfo)) + .build(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + public XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn) { + allDNs.remove(dn); + return buildValidReadChunkResponse(); + } + + @Override + public void releaseReceivedResponse( + ContainerProtos.ContainerCommandResponseProto response) { + releaseCount.incrementAndGet(); + } + }) { + IOException ex = assertThrows(IOException.class, + () -> client.sendCommand(request, Collections.singletonList((req, resp) -> { + throw new IOException("validator failed"); + }))); + assertThat(ex).hasMessageContaining("validator failed"); + } + + assertEquals(0, allDNs.size()); + assertEquals(dns.size(), releaseCount.get()); + } + @Test public void testInterruptedCommandThrowsInterruptedIOException() throws IOException { @@ -305,4 +360,32 @@ private XceiverClientReply buildValidResponse() { return new XceiverClientReply(replyFuture); } + private XceiverClientReply buildValidReadChunkResponse() { + ContainerProtos.ContainerCommandResponseProto resp = + ContainerProtos.ContainerCommandResponseProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setResult(ContainerProtos.Result.SUCCESS) + .setReadChunk(ContainerProtos.ReadChunkResponseProto.newBuilder() + .setBlockID(ContainerProtos.DatanodeBlockID.newBuilder() + .setContainerID(1) + .setLocalID(1) + .build()) + .setChunkData(ContainerProtos.ChunkInfo.newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build()) + .setData(ByteString.copyFrom(new byte[] {1})) + .build()) + .build(); + final CompletableFuture + replyFuture = new CompletableFuture<>(); + replyFuture.complete(resp); + return new XceiverClientReply(replyFuture); + } + } From 3916559c3c0887aa752c0fca13abad4558fb06c8 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Tue, 26 May 2026 18:40:55 -0700 Subject: [PATCH 3/7] HDDS-10283. Preserve readChunk compatibility for zero-copy reads --- .../hdds/scm/storage/ChunkInputStream.java | 4 +-- .../hadoop/hdds/scm/XceiverClientSpi.java | 1 + .../scm/storage/ContainerProtocolCalls.java | 33 ++++++++++++++++++- .../scm/storage/TestContainerCommandsEC.java | 2 +- 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index 5d1b929843d5..faa8f2a9066f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -450,8 +450,8 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) throws IOException { ContainerCommandResponseProto reply = - ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators, - tokenSupplier.get()); + ContainerProtocolCalls.readChunkForZeroCopy(xceiverClient, readChunkInfo, + datanodeBlockID, validators, tokenSupplier.get()); // The zero-copy-tracked buffer for the chunk data lives inside `reply`. // Stash the outer reference so releaseBuffers() can return it once the // chunk bytes have been fully consumed. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index daf64490c760..cb3299216e18 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -224,6 +224,7 @@ public CompletableFuture watchForCommit(long index) { * be released back to Netty when the caller is done with the proto; * otherwise direct memory accumulates. Callers of {@code sendCommand} * that retain the response past the call (e.g. {@code ReadChunk} via + * {@code ContainerProtocolCalls.readChunkForZeroCopy(...)} in * {@code ChunkInputStream}) must invoke this method once they are done. *

* This method is idempotent and safe to call on responses that were diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 2e7077d5e406..9430cc247560 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -343,6 +343,37 @@ public static ContainerCommandRequestProto getPutBlockRequest( return builder.build(); } + /** + * Calls the container protocol to read a chunk and returns a detached + * {@link ReadChunkResponseProto}. + *

+ * Callers that need access to the zero-copy-backed outer response should use + * {@link #readChunkForZeroCopy(XceiverClientSpi, ChunkInfo, DatanodeBlockID, + * List, Token)}. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to read + * @param blockID ID of the block + * @param validators functions to validate the response + * @param token a token for this block (may be null) + * @return detached read chunk response + * @throws IOException if there is an I/O error while performing the call + */ + public static ReadChunkResponseProto readChunk( + XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, + List validators, + Token token) throws IOException { + ContainerCommandResponseProto reply = + readChunkForZeroCopy(xceiverClient, chunk, blockID, validators, token); + try { + // Detach the inner proto from any zero-copy-backed buffers before the + // outer response is released. + return ReadChunkResponseProto.parseFrom(reply.getReadChunk().toByteArray()); + } finally { + xceiverClient.releaseReceivedResponse(reply); + } + } + /** * Calls the container protocol to read a chunk and returns the outer * {@link ContainerCommandResponseProto}. The caller is responsible for @@ -361,7 +392,7 @@ public static ContainerCommandRequestProto getPutBlockRequest( * @return outer container command response containing the read chunk reply * @throws IOException if there is an I/O error while performing the call */ - public static ContainerCommandResponseProto readChunk( + public static ContainerCommandResponseProto readChunkForZeroCopy( XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, Token token) throws IOException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index d3f2bd2d2cee..4034702fe2c3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -515,7 +515,7 @@ public void testCreateRecoveryContainer() throws Exception { assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, readContainerResponseProto.getContainerData().getState()); ContainerProtos.ContainerCommandResponseProto readChunkReply = - ContainerProtocolCalls.readChunk(dnClient, + ContainerProtocolCalls.readChunkForZeroCopy(dnClient, writeChunkRequest.getWriteChunk().getChunkData(), blockID.getDatanodeBlockIDProtobufBuilder().setReplicaIndex(replicaIndex).build(), null, blockToken); From 573cd139d9766dd98c94c7c8d861020915744cb0 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Tue, 26 May 2026 19:00:32 -0700 Subject: [PATCH 4/7] HDDS-10283. Add coverage for legacy and zero-copy readChunk paths --- .../hdds/scm/TestXceiverClientGrpc.java | 105 +++++++++++++++--- .../scm/storage/TestContainerCommandsEC.java | 71 ++++++++---- 2 files changed, 143 insertions(+), 33 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 06cf526e9f5e..05792f3396bf 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -48,6 +49,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; /** * Tests for TestXceiverClientGrpc, to ensure topology aware reads work @@ -55,6 +58,11 @@ */ public class TestXceiverClientGrpc { + private enum ReadChunkApi { + LEGACY, + ZERO_COPY + } + private Pipeline pipeline; private List dns; private List dnsInOrder; @@ -133,8 +141,9 @@ public XceiverClientReply sendCommandAsync( assertEquals(0, allDNs.size()); } - @Test - public void testReadChunkRetryAllNodes() { + @ParameterizedTest(name = "readChunkApi={0}") + @EnumSource(ReadChunkApi.class) + public void testReadChunkRetryAllNodes(ReadChunkApi readChunkApi) { final ArrayList allDNs = new ArrayList<>(dns); assertThat(allDNs.size()).isGreaterThan(1); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @@ -146,13 +155,67 @@ public XceiverClientReply sendCommandAsync( throw new IOException("Failed " + dn); } }) { - invokeXceiverClientReadChunk(client); + invokeXceiverClientReadChunk(client, readChunkApi); } catch (IOException e) { e.printStackTrace(); } assertEquals(0, allDNs.size()); } + @ParameterizedTest(name = "readChunkApi={0}") + @EnumSource(ReadChunkApi.class) + public void testReadChunkApisReturnExpectedDataAndReleaseOwnership( + ReadChunkApi readChunkApi) throws IOException { + final AtomicInteger releaseCount = new AtomicInteger(); + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + public XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn) { + return buildValidReadChunkResponse(); + } + + @Override + public void releaseReceivedResponse( + ContainerProtos.ContainerCommandResponseProto response) { + releaseCount.incrementAndGet(); + } + }) { + BlockID bid = new BlockID(1, 1); + bid.setBlockCommitSequenceId(1); + ContainerProtos.ChunkInfo chunkInfo = + ContainerProtos.ChunkInfo.newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build(); + if (readChunkApi == ReadChunkApi.ZERO_COPY) { + ContainerProtos.ContainerCommandResponseProto reply = + ContainerProtocolCalls.readChunkForZeroCopy(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null); + assertEquals(0, releaseCount.get()); + try { + assertArrayEquals(new byte[] {1}, + reply.getReadChunk().getData().toByteArray()); + } finally { + client.releaseReceivedResponse(reply); + } + } else { + assertArrayEquals(new byte[] {1}, + ContainerProtocolCalls.readChunk(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null) + .getData().toByteArray()); + assertEquals(1, releaseCount.get()); + } + } + + assertEquals(1, releaseCount.get()); + } + @Test public void testReadChunkValidatorFailureReleasesResponseBeforeRetry() throws IOException { @@ -246,6 +309,9 @@ public XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, DatanodeDetails dn) { seenDNs.add(dn); + if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { + return buildValidReadChunkResponse(); + } return buildValidResponse(); } }) { @@ -290,8 +356,10 @@ public XceiverClientReply sendCommandAsync( } } - @Test - public void testConnectionReusedAfterGetBlock() throws IOException { + @ParameterizedTest(name = "readChunkApi={0}") + @EnumSource(ReadChunkApi.class) + public void testConnectionReusedAfterGetBlock(ReadChunkApi readChunkApi) + throws IOException { // With a new Client, make 100 calls. On each call, ensure that only one // DN is seen, indicating the same DN connection is reused. for (int i = 0; i < 100; i++) { @@ -302,12 +370,15 @@ public XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, DatanodeDetails dn) { seenDNs.add(dn); + if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { + return buildValidReadChunkResponse(); + } return buildValidResponse(); } }) { invokeXceiverClientGetBlock(client); invokeXceiverClientGetBlock(client); - invokeXceiverClientReadChunk(client); + invokeXceiverClientReadChunk(client, readChunkApi); invokeXceiverClientReadSmallFile(client); } assertEquals(1, seenDNs.size()); @@ -324,22 +395,30 @@ private void invokeXceiverClientGetBlock(XceiverClientSpi client) .build()), null, client.getPipeline().getReplicaIndexes()); } - private void invokeXceiverClientReadChunk(XceiverClientSpi client) + private void invokeXceiverClientReadChunk(XceiverClientSpi client, + ReadChunkApi readChunkApi) throws IOException { BlockID bid = new BlockID(1, 1); bid.setBlockCommitSequenceId(1); - ContainerProtocolCalls.readChunk(client, + ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo.newBuilder() .setChunkName("Anything") .setChecksumData(ContainerProtos.ChecksumData.newBuilder() - .setBytesPerChecksum(512) + .setBytesPerChecksum(1) .setType(ContainerProtos.ChecksumType.CRC32) .build()) - .setLen(-1) + .setLen(1) .setOffset(0) - .build(), - bid.getDatanodeBlockIDProtobuf(), - null, null); + .build(); + if (readChunkApi == ReadChunkApi.ZERO_COPY) { + ContainerProtos.ContainerCommandResponseProto reply = + ContainerProtocolCalls.readChunkForZeroCopy(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null); + client.releaseReceivedResponse(reply); + return; + } + ContainerProtocolCalls.readChunk(client, chunkInfo, + bid.getDatanodeBlockIDProtobuf(), null, null); } private void invokeXceiverClientReadSmallFile(XceiverClientSpi client) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 4034702fe2c3..f23e6300cc8d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -165,6 +165,11 @@ public class TestContainerCommandsEC { private static OzoneBucket classBucket; private static ReplicationConfig repConfig; + private enum ReadChunkPath { + LEGACY, + ZERO_COPY + } + @BeforeAll public static void init() throws Exception { config = new OzoneConfiguration(); @@ -245,6 +250,10 @@ private void closeAllPipelines(ReplicationConfig replicationConfig) { }); } + private static Stream readChunkPaths() { + return Stream.of(ReadChunkPath.values()); + } + @Test public void testOrphanBlock() throws Exception { // Close all pipelines so we are guaranteed to get a new one @@ -443,8 +452,10 @@ public void testListBlock() throws Exception { } } - @Test - public void testCreateRecoveryContainer() throws Exception { + @ParameterizedTest(name = "readChunkPath={0}") + @MethodSource("readChunkPaths") + public void testCreateRecoveryContainer(ReadChunkPath readChunkPath) + throws Exception { try (XceiverClientManager xceiverClientManager = new XceiverClientManager(config)) { ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2); @@ -514,30 +525,50 @@ public void testCreateRecoveryContainer() throws Exception { container.containerID().getProtobuf().getId(), encodedToken); assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, readContainerResponseProto.getContainerData().getState()); - ContainerProtos.ContainerCommandResponseProto readChunkReply = - ContainerProtocolCalls.readChunkForZeroCopy(dnClient, - writeChunkRequest.getWriteChunk().getChunkData(), - blockID.getDatanodeBlockIDProtobufBuilder().setReplicaIndex(replicaIndex).build(), null, - blockToken); - try { - ContainerProtos.ReadChunkResponseProto readChunkResponseProto = - readChunkReply.getReadChunk(); - ByteBuffer[] readOnlyByteBuffersArray = BufferUtils - .getReadOnlyByteBuffersArray( - readChunkResponseProto.getDataBuffers().getBuffersList()); - assertEquals(readOnlyByteBuffersArray[0].limit(), data.length); - byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()]; - readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length); - assertArrayEquals(data, readBuff); - } finally { - dnClient.releaseReceivedResponse(readChunkReply); - } + ContainerProtos.DatanodeBlockID readBlockID = + blockID.getDatanodeBlockIDProtobufBuilder() + .setReplicaIndex(replicaIndex) + .build(); + assertReadChunkData(readChunkPath, dnClient, + writeChunkRequest.getWriteChunk().getChunkData(), + readBlockID, blockToken, data); } finally { xceiverClientManager.releaseClient(dnClient, false); } } } + private void assertReadChunkData(ReadChunkPath readChunkPath, + XceiverClientSpi dnClient, ContainerProtos.ChunkInfo chunkInfo, + ContainerProtos.DatanodeBlockID blockID, + Token blockToken, byte[] expectedData) + throws IOException { + ContainerProtos.ContainerCommandResponseProto readChunkReply = null; + ContainerProtos.ReadChunkResponseProto readChunkResponseProto; + if (readChunkPath == ReadChunkPath.ZERO_COPY) { + readChunkReply = ContainerProtocolCalls.readChunkForZeroCopy(dnClient, + chunkInfo, blockID, null, blockToken); + readChunkResponseProto = readChunkReply.getReadChunk(); + } else { + readChunkResponseProto = ContainerProtocolCalls.readChunk(dnClient, + chunkInfo, blockID, null, blockToken); + } + + try { + ByteBuffer[] readOnlyByteBuffersArray = BufferUtils + .getReadOnlyByteBuffersArray( + readChunkResponseProto.getDataBuffers().getBuffersList()); + assertEquals(readOnlyByteBuffersArray[0].limit(), expectedData.length); + byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()]; + readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length); + assertArrayEquals(expectedData, readBuff); + } finally { + if (readChunkReply != null) { + dnClient.releaseReceivedResponse(readChunkReply); + } + } + } + @Test public void testCreateRecoveryContainerAfterDNRestart() throws Exception { try (XceiverClientManager xceiverClientManager = From ec85e0fb641b9102d2d6e29bc5e70d7e42529231 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Tue, 26 May 2026 20:05:27 -0700 Subject: [PATCH 5/7] HDDS-10283. Keep zero-copy reads opt-in and cover both paths --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 75 +++++++++++---- .../hadoop/hdds/scm/XceiverClientSpi.java | 23 ++++- .../scm/storage/ContainerProtocolCalls.java | 2 +- .../checksum/TestFileChecksumHelper.java | 4 +- .../hdds/scm/TestXceiverClientGrpc.java | 94 +++++++++++++++---- .../om/TestOmContainerLocationCache.java | 66 ++++++++----- 6 files changed, 199 insertions(+), 65 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 53342c78becd..202971a952df 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -71,6 +71,7 @@ import org.apache.ratis.thirdparty.io.grpc.Channel; import org.apache.ratis.thirdparty.io.grpc.ClientCall; import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptors; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; import org.apache.ratis.thirdparty.io.grpc.Status; @@ -154,7 +155,8 @@ public ContainerCommandResponseProto parse(InputStream stream) { /** * gRPC interceptor that installs {@link #RESPONSE_MARSHALLER} as the - * response marshaller for the {@code send} bidi-streaming method. + * response marshaller for the {@code send} bidi-streaming method of the + * dedicated zero-copy stub. */ private static final ClientInterceptor ZERO_COPY_INTERCEPTOR = new ClientInterceptor() { @@ -280,7 +282,10 @@ private ChannelInfo generateNewChannel(DatanodeDetails dn) throws IOException { ManagedChannel channel = createChannel(dn, port).build(); XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc.newStub(channel); - return new ChannelInfo(channel, stub); + XceiverClientProtocolServiceStub zeroCopyStub = + XceiverClientProtocolServiceGrpc.newStub( + ClientInterceptors.intercept(channel, ZERO_COPY_INTERCEPTOR)); + return new ChannelInfo(channel, stub, zeroCopyStub); } protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) @@ -294,10 +299,7 @@ protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port) NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(dnHost, port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .proxyDetector(uri -> null) - // Order matters. The zero-copy interceptor swaps the response - // marshaller for `send`; the tracing interceptor wraps that. - .intercept(new GrpcClientInterceptor()) - .intercept(ZERO_COPY_INTERCEPTOR); + .intercept(new GrpcClientInterceptor()); if (secConfig.isSecurityEnabled() && secConfig.isGrpcTlsEnabled()) { SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient(); if (trustManager != null) { @@ -393,7 +395,7 @@ public Pipeline getPipeline() { public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { try { - return sendCommandWithTraceIDAndRetry(request, null). + return sendCommandWithTraceIDAndRetry(request, null, false). getResponse().get(); } catch (ExecutionException e) { throw getIOExceptionForSendCommand(request, e); @@ -484,7 +486,8 @@ public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request, List validators) throws IOException { try { - XceiverClientReply reply = sendCommandWithTraceIDAndRetry(request, validators); + XceiverClientReply reply = + sendCommandWithTraceIDAndRetry(request, validators, false); return reply.getResponse().get(); } catch (ExecutionException e) { throw getIOExceptionForSendCommand(request, e); @@ -497,9 +500,29 @@ public ContainerCommandResponseProto sendCommand( } } - private XceiverClientReply sendCommandWithTraceIDAndRetry( + @Override + public ContainerCommandResponseProto sendCommandWithZeroCopy( ContainerCommandRequestProto request, List validators) throws IOException { + try { + XceiverClientReply reply = + sendCommandWithTraceIDAndRetry(request, validators, true); + return reply.getResponse().get(); + } catch (ExecutionException e) { + throw getIOExceptionForSendCommand(request, e); + } catch (InterruptedException e) { + LOG.error("Command execution was interrupted."); + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException( + "Command " + processForDebug(request) + " was interrupted.") + .initCause(e); + } + } + + private XceiverClientReply sendCommandWithTraceIDAndRetry( + ContainerCommandRequestProto request, List validators, + boolean zeroCopy) + throws IOException { String spanName = "XceiverClientGrpc." + request.getCmdType().name(); @@ -511,7 +534,7 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry( if (!request.hasVersion()) { builder.setVersion(ClientVersion.CURRENT.toProtoValue()); } - return sendCommandWithRetry(builder.build(), validators); + return sendCommandWithRetry(builder.build(), validators, zeroCopy); }); } @@ -576,7 +599,8 @@ private static DatanodeBlockID getRequestBlockID(ContainerCommandRequestProto re } private XceiverClientReply sendCommandWithRetry( - ContainerCommandRequestProto request, List validators) + ContainerCommandRequestProto request, List validators, + boolean zeroCopy) throws IOException { ContainerCommandResponseProto responseProto = null; IOException ioException = null; @@ -596,7 +620,9 @@ private XceiverClientReply sendCommandWithRetry( // sendCommandAsyncCall will create a new channel and async stub // in case these don't exist for the specific datanode. reply.addDatanode(dn); - responseProto = sendCommandAsync(request, dn).getResponse().get(); + responseProto = (zeroCopy + ? sendCommandAsync(request, dn, true) + : sendCommandAsync(request, dn)).getResponse().get(); if (validators != null && !validators.isEmpty()) { for (Validator validator : validators) { validator.accept(request, responseProto); @@ -776,11 +802,19 @@ protected boolean shouldBlockAndWaitAsyncReply( public XceiverClientReply sendCommandAsync( ContainerCommandRequestProto request, DatanodeDetails dn) throws IOException, InterruptedException { + return sendCommandAsync(request, dn, false); + } + + @VisibleForTesting + protected XceiverClientReply sendCommandAsync( + ContainerCommandRequestProto request, DatanodeDetails dn, + boolean zeroCopy) + throws IOException, InterruptedException { checkOpen(dn); DatanodeID dnId = dn.getID(); if (LOG.isDebugEnabled()) { - LOG.debug("Send command {} to datanode {}", - request.getCmdType(), dn); + LOG.debug("Send command {} to datanode {}{}", + request.getCmdType(), dn, zeroCopy ? " with zero-copy response" : ""); } final CompletableFuture replyFuture = new CompletableFuture<>(); @@ -790,7 +824,9 @@ public XceiverClientReply sendCommandAsync( // create a new grpc message stream pair for each call. final StreamObserver requestObserver = - dnChannelInfoMap.get(dnId).getStub().withDeadlineAfter(timeout, TimeUnit.SECONDS) + (zeroCopy ? dnChannelInfoMap.get(dnId).getZeroCopyStub() + : dnChannelInfoMap.get(dnId).getStub()) + .withDeadlineAfter(timeout, TimeUnit.SECONDS) .send(new StreamObserver() { @Override public void onNext(ContainerCommandResponseProto value) { @@ -871,10 +907,13 @@ public void releaseReceivedResponse(ContainerCommandResponseProto response) { private static class ChannelInfo { private final ManagedChannel channel; private final XceiverClientProtocolServiceStub stub; + private final XceiverClientProtocolServiceStub zeroCopyStub; - ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub) { + ChannelInfo(ManagedChannel channel, XceiverClientProtocolServiceStub stub, + XceiverClientProtocolServiceStub zeroCopyStub) { this.channel = channel; this.stub = stub; + this.zeroCopyStub = zeroCopyStub; } public ManagedChannel getChannel() { @@ -885,6 +924,10 @@ public XceiverClientProtocolServiceStub getStub() { return stub; } + public XceiverClientProtocolServiceStub getZeroCopyStub() { + return zeroCopyStub; + } + public boolean isChannelInactive() { return channel == null || channel.isTerminated() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java index cb3299216e18..9e26916c3203 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java @@ -155,6 +155,24 @@ public ContainerCommandResponseProto sendCommand( } } + /** + * Sends a given command using an explicit zero-copy-capable response path + * when supported by the transport. + *

+ * The default implementation delegates to {@link #sendCommand( + * ContainerCommandRequestProto, List)}, so transports without zero-copy + * support continue to return normal heap-backed responses. + * + * @param request Request + * @param validators functions to validate the response + * @return Response to the command + */ + public ContainerCommandResponseProto sendCommandWithZeroCopy( + ContainerCommandRequestProto request, + List validators) throws IOException { + return sendCommand(request, validators); + } + public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) throws IOException { throw new UnsupportedOperationException("Stream read is not supported"); } @@ -222,8 +240,9 @@ public CompletableFuture watchForCommit(long index) { * marshaller, the parsed proto's {@code bytes} fields reference the * Netty-managed pooled buffer of the inbound message. Those buffers must * be released back to Netty when the caller is done with the proto; - * otherwise direct memory accumulates. Callers of {@code sendCommand} - * that retain the response past the call (e.g. {@code ReadChunk} via + * otherwise direct memory accumulates. Callers of + * {@code sendCommandWithZeroCopy} that retain the response past the call + * (e.g. {@code ReadChunk} via * {@code ContainerProtocolCalls.readChunkForZeroCopy(...)} in * {@code ChunkInputStream}) must invoke this method once they are done. *

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 9430cc247560..333ab744cd05 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -433,7 +433,7 @@ private static ContainerCommandResponseProto readChunk( requestBuilder = requestBuilder.setTraceID(traceId); } ContainerCommandResponseProto reply = - xceiverClient.sendCommand(requestBuilder.build(), validators); + xceiverClient.sendCommandWithZeroCopy(requestBuilder.build(), validators); final ReadChunkResponseProto response = reply.getReadChunk(); final long readLen = getLen(response); if (readLen != chunk.getLen()) { diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java index bc894a58f9cc..8c72410b0313 100644 --- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java +++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/checksum/TestFileChecksumHelper.java @@ -217,9 +217,9 @@ public void testOneBlock(ReplicationType helperType) throws IOException { XceiverClientGrpc xceiverClientGrpc = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { return buildValidResponse(helperType); } }; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 05792f3396bf..b472af355b32 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -108,9 +108,9 @@ public void testLeaderNodeIsCommandTarget() throws IOException { for (int i = 0; i < 100; i++) { try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDN.add(dn); return buildValidResponse(); } @@ -127,9 +127,9 @@ public void testGetBlockRetryAlNodes() { assertThat(allDNs.size()).isGreaterThan(1); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) throws IOException { + DatanodeDetails dn, boolean zeroCopy) throws IOException { allDNs.remove(dn); throw new IOException("Failed " + dn); } @@ -148,9 +148,9 @@ public void testReadChunkRetryAllNodes(ReadChunkApi readChunkApi) { assertThat(allDNs.size()).isGreaterThan(1); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) throws IOException { + DatanodeDetails dn, boolean zeroCopy) throws IOException { allDNs.remove(dn); throw new IOException("Failed " + dn); } @@ -169,9 +169,9 @@ public void testReadChunkApisReturnExpectedDataAndReleaseOwnership( final AtomicInteger releaseCount = new AtomicInteger(); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { return buildValidReadChunkResponse(); } @@ -216,6 +216,64 @@ public void releaseReceivedResponse( assertEquals(1, releaseCount.get()); } + @Test + public void testReadChunkZeroCopyIsOptInOnly() throws IOException { + final AtomicInteger regularCallCount = new AtomicInteger(); + final AtomicInteger zeroCopyCallCount = new AtomicInteger(); + final BlockID blockID = new BlockID(1, 1); + final ContainerProtos.ChunkInfo chunkInfo = ContainerProtos.ChunkInfo + .newBuilder() + .setChunkName("Anything") + .setChecksumData(ContainerProtos.ChecksumData.newBuilder() + .setBytesPerChecksum(1) + .setType(ContainerProtos.ChecksumType.CRC32) + .build()) + .setLen(1) + .setOffset(0) + .build(); + final ContainerProtos.ContainerCommandRequestProto request = + ContainerProtos.ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.ReadChunk) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(pipeline.getFirstNode().getUuidString()) + .setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunkInfo)) + .build(); + + try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { + @Override + protected XceiverClientReply sendCommandAsync( + ContainerProtos.ContainerCommandRequestProto request, + DatanodeDetails dn, boolean zeroCopy) { + if (zeroCopy) { + zeroCopyCallCount.incrementAndGet(); + } else { + regularCallCount.incrementAndGet(); + } + return buildValidReadChunkResponse(); + } + }) { + assertArrayEquals(new byte[] {1}, + client.sendCommand(request).getReadChunk().getData().toByteArray()); + assertEquals(1, regularCallCount.get()); + assertEquals(0, zeroCopyCallCount.get()); + + ContainerProtos.ContainerCommandResponseProto reply = + ContainerProtocolCalls.readChunkForZeroCopy(client, chunkInfo, + blockID.getDatanodeBlockIDProtobuf(), null, null); + try { + assertArrayEquals(new byte[] {1}, + reply.getReadChunk().getData().toByteArray()); + } finally { + client.releaseReceivedResponse(reply); + } + } + + assertEquals(1, regularCallCount.get()); + assertEquals(1, zeroCopyCallCount.get()); + } + @Test public void testReadChunkValidatorFailureReleasesResponseBeforeRetry() throws IOException { @@ -244,9 +302,9 @@ public void testReadChunkValidatorFailureReleasesResponseBeforeRetry() try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { allDNs.remove(dn); return buildValidReadChunkResponse(); } @@ -275,9 +333,9 @@ public void testInterruptedCommandThrowsInterruptedIOException() new CompletableFuture<>(); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { return new XceiverClientReply(response); } }) { @@ -305,9 +363,9 @@ public void testFirstNodeIsCorrectWithTopologyForCommandTarget() for (int i = 0; i < 100; i++) { try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDNs.add(dn); if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { return buildValidReadChunkResponse(); @@ -340,9 +398,9 @@ public void testPrimaryReadFromNormalDatanode() setPersistedOpState(NodeOperationalState.IN_MAINTENANCE); try (XceiverClientGrpc client = new XceiverClientGrpc(randomPipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDNs.add(dn); return buildValidResponse(); } @@ -366,9 +424,9 @@ public void testConnectionReusedAfterGetBlock(ReadChunkApi readChunkApi) final Set seenDNs = new HashSet<>(); try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) { @Override - public XceiverClientReply sendCommandAsync( + protected XceiverClientReply sendCommandAsync( ContainerProtos.ContainerCommandRequestProto request, - DatanodeDetails dn) { + DatanodeDetails dn, boolean zeroCopy) { seenDNs.add(dn); if (request.getCmdType() == ContainerProtos.Type.ReadChunk) { return buildValidReadChunkResponse(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java index 4e69848b307d..399eb6f10bf3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmContainerLocationCache.java @@ -835,35 +835,49 @@ private void mockReadChunk(XceiverClientGrpc mockDnProtocol, byte[] data, Exception exception, Result errorCode) throws Exception { - final CompletableFuture response; - if (exception != null) { - response = new CompletableFuture<>(); - response.completeExceptionally(exception); - } else if (errorCode != null) { - ContainerCommandResponseProto readChunkResp = - ContainerCommandResponseProto.newBuilder() - .setResult(errorCode) - .setCmdType(Type.ReadChunk) - .build(); - response = completedFuture(readChunkResp); + final ContainerCommandResponseProto response; + if (errorCode != null) { + response = ContainerCommandResponseProto.newBuilder() + .setResult(errorCode) + .setCmdType(Type.ReadChunk) + .build(); + } else if (data != null) { + response = ContainerCommandResponseProto.newBuilder() + .setReadChunk(ReadChunkResponseProto.newBuilder() + .setBlockID(createBlockId(containerId, localId)) + .setChunkData(createChunkInfo(data)) + .setData(ByteString.copyFrom(data)) + .build() + ) + .setResult(Result.SUCCESS) + .setCmdType(Type.ReadChunk) + .build(); } else { - ContainerCommandResponseProto readChunkResp = - ContainerCommandResponseProto.newBuilder() - .setReadChunk(ReadChunkResponseProto.newBuilder() - .setBlockID(createBlockId(containerId, localId)) - .setChunkData(createChunkInfo(data)) - .setData(ByteString.copyFrom(data)) - .build() - ) - .setResult(Result.SUCCESS) - .setCmdType(Type.ReadChunk) - .build(); - response = completedFuture(readChunkResp); + response = null; } - doAnswer(invocation -> new XceiverClientReply(response)) - .when(mockDnProtocol) - .sendCommandAsync(argThat(matchCmd(Type.ReadChunk)), any()); + doAnswer(invocation -> { + if (exception != null) { + ExecutionException executionException = new ExecutionException( + exception); + if (Status.fromThrowable(exception).getCode() + == Status.UNAUTHENTICATED.getCode()) { + throw new SCMSecurityException("Failed to authenticate with " + + "GRPC XceiverServer with Ozone block token."); + } + throw new IOException(executionException); + } + + ContainerCommandRequestProto request = invocation.getArgument(0); + List validators = invocation.getArgument(1); + if (validators != null) { + for (XceiverClientSpi.Validator validator : validators) { + validator.accept(request, response); + } + } + return response; + }).when(mockDnProtocol) + .sendCommandWithZeroCopy(argThat(matchCmd(Type.ReadChunk)), any()); } From 6d9efc015cfb13c6380a7cfefe5b06e99be64c4c Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Wed, 27 May 2026 00:58:10 -0700 Subject: [PATCH 6/7] HDDS-10283. Restore legacy readChunk path behavior --- .../hadoop/hdds/scm/XceiverClientGrpc.java | 35 ++++---------- .../scm/storage/ContainerProtocolCalls.java | 46 +++++++++++++------ .../hdds/scm/TestXceiverClientGrpc.java | 5 +- 3 files changed, 45 insertions(+), 41 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index 202971a952df..9e2f91e6fd11 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -394,18 +394,7 @@ public Pipeline getPipeline() { @Override public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException { - try { - return sendCommandWithTraceIDAndRetry(request, null, false). - getResponse().get(); - } catch (ExecutionException e) { - throw getIOExceptionForSendCommand(request, e); - } catch (InterruptedException e) { - LOG.error("Command execution was interrupted."); - Thread.currentThread().interrupt(); - throw (IOException) new InterruptedIOException( - "Command " + processForDebug(request) + " was interrupted.") - .initCause(e); - } + return sendCommandAndWait(request, null, false); } @Override @@ -485,28 +474,22 @@ private ContainerCommandRequestProto reconstructRequestIfNeeded( public ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request, List validators) throws IOException { - try { - XceiverClientReply reply = - sendCommandWithTraceIDAndRetry(request, validators, false); - return reply.getResponse().get(); - } catch (ExecutionException e) { - throw getIOExceptionForSendCommand(request, e); - } catch (InterruptedException e) { - LOG.error("Command execution was interrupted."); - Thread.currentThread().interrupt(); - throw (IOException) new InterruptedIOException( - "Command " + processForDebug(request) + " was interrupted.") - .initCause(e); - } + return sendCommandAndWait(request, validators, false); } @Override public ContainerCommandResponseProto sendCommandWithZeroCopy( ContainerCommandRequestProto request, List validators) throws IOException { + return sendCommandAndWait(request, validators, true); + } + + private ContainerCommandResponseProto sendCommandAndWait( + ContainerCommandRequestProto request, List validators, + boolean zeroCopy) throws IOException { try { XceiverClientReply reply = - sendCommandWithTraceIDAndRetry(request, validators, true); + sendCommandWithTraceIDAndRetry(request, validators, zeroCopy); return reply.getResponse().get(); } catch (ExecutionException e) { throw getIOExceptionForSendCommand(request, e); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 333ab744cd05..a6100c460ac9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -363,14 +363,29 @@ public static ReadChunkResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, Token token) throws IOException { - ContainerCommandResponseProto reply = - readChunkForZeroCopy(xceiverClient, chunk, blockID, validators, token); - try { - // Detach the inner proto from any zero-copy-backed buffers before the - // outer response is released. - return ReadChunkResponseProto.parseFrom(reply.getReadChunk().toByteArray()); - } finally { - xceiverClient.releaseReceivedResponse(reply); + ReadChunkRequestProto.Builder readChunkRequest = + ReadChunkRequestProto.newBuilder() + .setBlockID(blockID) + .setChunkData(chunk) + .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); + ContainerCommandRequestProto.Builder builder = + ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk) + .setContainerID(blockID.getContainerID()) + .setReadChunk(readChunkRequest); + if (token != null) { + builder.setEncodedToken(token.encodeToUrlString()); + } + + try (TracingUtil.TraceCloseable ignored = + TracingUtil.createActivatedSpan("readChunk")) { + Span span = TracingUtil.getActiveSpan(); + span.setAttribute("offset", chunk.getOffset()) + .setAttribute("length", chunk.getLen()) + .setAttribute("block", blockID.toString()); + return tryEachDatanode(xceiverClient.getPipeline(), + d -> readChunk(xceiverClient, chunk, blockID, + validators, builder, d, false).getReadChunk(), + d -> toErrorMessage(chunk, blockID, d)); } } @@ -416,7 +431,7 @@ public static ContainerCommandResponseProto readChunkForZeroCopy( .setAttribute("block", blockID.toString()); return tryEachDatanode(xceiverClient.getPipeline(), d -> readChunk(xceiverClient, chunk, blockID, - validators, builder, d), + validators, builder, d, true), d -> toErrorMessage(chunk, blockID, d)); } } @@ -425,7 +440,7 @@ private static ContainerCommandResponseProto readChunk( XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, ContainerCommandRequestProto.Builder builder, - DatanodeDetails d) throws IOException { + DatanodeDetails d, boolean zeroCopy) throws IOException { ContainerCommandRequestProto.Builder requestBuilder = builder .setDatanodeUuid(d.getUuidString()); String traceId = TracingUtil.exportCurrentSpan(); @@ -433,12 +448,17 @@ private static ContainerCommandResponseProto readChunk( requestBuilder = requestBuilder.setTraceID(traceId); } ContainerCommandResponseProto reply = - xceiverClient.sendCommandWithZeroCopy(requestBuilder.build(), validators); + zeroCopy + ? xceiverClient.sendCommandWithZeroCopy( + requestBuilder.build(), validators) + : xceiverClient.sendCommand(requestBuilder.build(), validators); final ReadChunkResponseProto response = reply.getReadChunk(); final long readLen = getLen(response); if (readLen != chunk.getLen()) { - // Release the zero-copy-tracked buffer before propagating the error. - xceiverClient.releaseReceivedResponse(reply); + if (zeroCopy) { + // Release the zero-copy-tracked buffer before propagating the error. + xceiverClient.releaseReceivedResponse(reply); + } throw new IOException(toErrorMessage(chunk, blockID, d) + ": readLen=" + readLen); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index b472af355b32..2c07aed114ad 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -209,11 +209,12 @@ public void releaseReceivedResponse( ContainerProtocolCalls.readChunk(client, chunkInfo, bid.getDatanodeBlockIDProtobuf(), null, null) .getData().toByteArray()); - assertEquals(1, releaseCount.get()); + assertEquals(0, releaseCount.get()); } } - assertEquals(1, releaseCount.get()); + assertEquals(readChunkApi == ReadChunkApi.ZERO_COPY ? 1 : 0, + releaseCount.get()); } @Test From b7e7ebbb2261508d6526308caab96f3c1dd6cb91 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Wed, 27 May 2026 16:22:57 -0700 Subject: [PATCH 7/7] HDDS-10283. Fix PMD field ordering in TestXceiverClientGrpc --- .../apache/hadoop/hdds/scm/TestXceiverClientGrpc.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 2c07aed114ad..d92ed48b838c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -57,17 +57,16 @@ * select the closest node, and connections are re-used after a getBlock call. */ public class TestXceiverClientGrpc { + private Pipeline pipeline; + private List dns; + private List dnsInOrder; + private OzoneConfiguration conf = new OzoneConfiguration(); private enum ReadChunkApi { LEGACY, ZERO_COPY } - private Pipeline pipeline; - private List dns; - private List dnsInOrder; - private OzoneConfiguration conf = new OzoneConfiguration(); - @BeforeEach public void setup() { dns = new ArrayList<>();