diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java index 794504b1ef3d..ba201baeccd3 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java @@ -23,6 +23,7 @@ import java.nio.channels.GatheringByteChannel; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.util.DirectBufferPool; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,13 +36,21 @@ public final class BufferUtils { private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {}; + /** Pool for reusing direct {@link ByteBuffer}s allocated by {@link #assignByteBuffers}. */ + public static final DirectBufferPool BUFFER_POOL = new DirectBufferPool(); + /** Utility classes should not be constructed. **/ private BufferUtils() { } /** - * Assign an array of ByteBuffers. + * Allocate an array of direct {@link ByteBuffer}s drawn from + * {@link #BUFFER_POOL}, sized to hold {@code totalLen} bytes split into + * slices of at most {@code bufferCapacity} bytes each. Callers are + * responsible for returning each buffer via + * {@link DirectBufferPool#returnBuffer} when finished. + * * @param totalLen total length of all ByteBuffers * @param bufferCapacity max capacity of each ByteBuffer */ @@ -58,13 +67,15 @@ public static ByteBuffer[] assignByteBuffers(long totalLen, long allocatedLen = 0; // For each ByteBuffer (except the last) allocate bufferLen of capacity for (int i = 0; i < numBuffers - 1; i++) { - dataBuffers[i] = ByteBuffer.allocate(bufferCapacity); + dataBuffers[i] = BUFFER_POOL.getBuffer(bufferCapacity); + dataBuffers[i].limit(bufferCapacity); allocatedLen += bufferCapacity; } // For the last ByteBuffer, allocate as much space as is needed to fit // remaining bytes - dataBuffers[numBuffers - 1] = ByteBuffer.allocate( - Math.toIntExact(totalLen - allocatedLen)); + final int lastLen = Math.toIntExact(totalLen - allocatedLen); + dataBuffers[numBuffers - 1] = BUFFER_POOL.getBuffer(lastLen); + dataBuffers[numBuffers - 1].limit(lastLen); return dataBuffers; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 9cde9ba315e3..896aab760abf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -202,18 +202,25 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data, @SuppressWarnings("checkstyle:parameternumber") public static ChunkBuffer readData(long len, int bufferCapacity, File file, long off, HddsVolume volume, int readMappedBufferThreshold, boolean mmapEnabled, - MappedBufferManager mappedBufferManager) throws StorageContainerException { + MappedBufferManager mappedBufferManager, DispatcherContext dispatcherContext) + throws StorageContainerException { if (mmapEnabled && len > readMappedBufferThreshold && bufferCapacity > readMappedBufferThreshold) { - return readData(file, bufferCapacity, off, len, volume, mappedBufferManager); + return readData(file, bufferCapacity, off, len, volume, mappedBufferManager, dispatcherContext); } else if (len == 0) { return ChunkBuffer.wrap(Collections.emptyList()); + } else { + final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len, bufferCapacity); + readData(file, off, len, c -> c.position(off).read(buffers), volume); + Arrays.stream(buffers).forEach(ByteBuffer::flip); + if (dispatcherContext != null && dispatcherContext.isReleaseSupported()) { + dispatcherContext.setReleaseMethod(() -> { + for (ByteBuffer buf : buffers) { + BufferUtils.BUFFER_POOL.returnBuffer(buf); + } + }); + } + return ChunkBuffer.wrap(Arrays.asList(buffers)); } - - final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len, - bufferCapacity); - readData(file, off, len, c -> c.position(off).read(buffers), volume); - Arrays.stream(buffers).forEach(ByteBuffer::flip); - return ChunkBuffer.wrap(Arrays.asList(buffers)); } private static void readData(File file, long offset, long len, @@ -253,16 +260,23 @@ private static void readData(File file, long offset, long len, * @return a list of {@link MappedByteBuffer} containing the data. */ private static ChunkBuffer readData(File file, int chunkSize, - long offset, long length, HddsVolume volume, MappedBufferManager mappedBufferManager) + long offset, long length, HddsVolume volume, MappedBufferManager mappedBufferManager, + DispatcherContext dispatcherContext) throws StorageContainerException { final int bufferNum = Math.toIntExact((length - 1) / chunkSize) + 1; if (!mappedBufferManager.getQuota(bufferNum)) { // proceed with normal buffer - final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(length, - chunkSize); + final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(length, chunkSize); readData(file, offset, length, c -> c.position(offset).read(buffers), volume); Arrays.stream(buffers).forEach(ByteBuffer::flip); + if (dispatcherContext != null && dispatcherContext.isReleaseSupported()) { + dispatcherContext.setReleaseMethod(() -> { + for (ByteBuffer buf : buffers) { + BufferUtils.BUFFER_POOL.returnBuffer(buf); + } + }); + } return ChunkBuffer.wrap(Arrays.asList(buffers)); } else { try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index 9a13507d6b37..f40cabf41d34 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -244,7 +244,8 @@ public ChunkBufferToByteString readChunk(Container container, BlockID blockID, return ChunkUtils.readData(chunkFile, bufferCapacity, offset, len, volume, dispatcherContext); } return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume, - readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager); + readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager, + dispatcherContext); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java index a36d8471a8f0..a667e8561962 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java @@ -275,7 +275,8 @@ public ChunkBufferToByteString readChunk(Container container, BlockID blockID, return ChunkUtils.readData(file, bufferCapacity, offset, len, volume, dispatcherContext); } return ChunkUtils.readData(len, bufferCapacity, file, offset, volume, - readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager); + readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager, + dispatcherContext); } } catch (StorageContainerException ex) { //UNABLE TO FIND chunk is not a problem as we will try with the diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java index 86b2d5b6f79e..f482b9f461b5 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java @@ -78,7 +78,7 @@ static ChunkBuffer readData(File file, long off, long len) throws StorageContainerException { LOG.info("off={}, len={}", off, len); return ChunkUtils.readData(len, BUFFER_CAPACITY, file, off, null, - MAPPED_BUFFER_THRESHOLD, true, MAPPED_BUFFER_MANAGER); + MAPPED_BUFFER_THRESHOLD, true, MAPPED_BUFFER_MANAGER, null); } @Test @@ -169,12 +169,15 @@ void concurrentReadWriteOfSameFile() { assertEquals(1, buffers.size()); final ByteBuffer readBuffer = buffers.get(0); + int remaining = readBuffer.remaining(); + byte[] readArray = new byte[remaining]; + readBuffer.get(readArray); LOG.info("Read data ({}): {}", threadNumber, - new String(readBuffer.array(), UTF_8)); - if (!Arrays.equals(array, readBuffer.array())) { + new String(readArray, UTF_8)); + if (!Arrays.equals(array, readArray)) { fail.getAndIncrement(); } - assertEquals(len, readBuffer.remaining()); + assertEquals(len, remaining); } catch (Exception ee) { LOG.error("Failed to read data ({})", threadNumber, ee); fail.getAndIncrement();