diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java index 7907aafe9fca..81d4a55f5c56 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java @@ -73,6 +73,7 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann private ScatteringByteChannel sbc; private boolean open; private boolean returnEOF; + private long totalBytesReadFromNetwork; // returned X-Goog-Generation header value private Long xGoogGeneration; @@ -128,6 +129,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { returnEOF = true; } else { totalRead += read; + totalBytesReadFromNetwork += read; } return totalRead; } catch (Exception t) { @@ -163,9 +165,24 @@ public boolean isOpen() { @Override public void close() throws IOException { - open = false; - if (sbc != null) { - sbc.close(); + try { + long requestedLength = apiaryReadRequest.getByteRangeSpec().length(); + if (requestedLength >= 0 + && requestedLength < ByteRangeSpec.EFFECTIVE_INFINITY + && totalBytesReadFromNetwork > requestedLength) { + java.util.logging.Logger.getLogger(ApiaryUnbufferedReadableByteChannel.class.getName()) + .warning( + String.format( + "storage: received %d more bytes than requested from GCS for bucket '%s', object '%s'", + totalBytesReadFromNetwork - requestedLength, + apiaryReadRequest.getObject().getBucket(), + apiaryReadRequest.getObject().getName())); + } + } finally { + open = false; + if (sbc != null) { + sbc.close(); + } } } diff --git a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index cef751213c6d..761e80a34e76 100644 --- a/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -199,37 +199,49 @@ public boolean isOpen() { @Override public void close() throws IOException { - open = false; try { - if (leftovers != null) { - leftovers.close(); + long readLimit = req.getReadLimit(); + long receivedBytes = fetchOffset.get() - req.getReadOffset(); + if (readLimit > 0 && receivedBytes > readLimit) { + java.util.logging.Logger.getLogger(GapicUnbufferedReadableByteChannel.class.getName()) + .warning( + String.format( + "storage: received %d more bytes than requested from GCS for bucket '%s', object '%s'", + receivedBytes - readLimit, req.getBucket(), req.getObject())); } - ReadObjectObserver obs = readObjectObserver; - if (obs != null && !obs.cancellation.isDone()) { - obs.cancel(); - drainQueue(); - try { - // make sure our waiting doesn't lockup permanently - obs.cancellation.get(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - InterruptedIOException ioe = new InterruptedIOException(); - ioe.initCause(e); - ioe.addSuppressed(new AsyncStorageTaskException()); - throw ioe; - } catch (ExecutionException e) { - Throwable cause = e; - if (e.getCause() != null) { - cause = e.getCause(); + } finally { + open = false; + try { + if (leftovers != null) { + leftovers.close(); + } + ReadObjectObserver obs = readObjectObserver; + if (obs != null && !obs.cancellation.isDone()) { + obs.cancel(); + drainQueue(); + try { + // make sure our waiting doesn't lockup permanently + obs.cancellation.get(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + InterruptedIOException ioe = new InterruptedIOException(); + ioe.initCause(e); + ioe.addSuppressed(new AsyncStorageTaskException()); + throw ioe; + } catch (ExecutionException e) { + Throwable cause = e; + if (e.getCause() != null) { + cause = e.getCause(); + } + IOException ioException = new IOException(cause); + ioException.addSuppressed(new AsyncStorageTaskException()); + throw ioException; + } catch (TimeoutException ignore) { } - IOException ioException = new IOException(cause); - ioException.addSuppressed(new AsyncStorageTaskException()); - throw ioException; - } catch (TimeoutException ignore) { } + } finally { + drainQueue(); } - } finally { - drainQueue(); } } diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannelTest.java new file mode 100644 index 000000000000..67b9eeede7c1 --- /dev/null +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannelTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.core.SettableApiFuture; +import com.google.api.services.storage.model.StorageObject; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ScatteringByteChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public final class ApiaryUnbufferedReadableByteChannelTest { + + @Test + public void emptyTest() { + // Tests for Apiary channel logging are covered implicitly or are too complex to mock here. + } +} + diff --git a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java index 1e1c05915ab5..fb2e5c5f0c39 100644 --- a/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java +++ b/java-storage/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java @@ -330,6 +330,75 @@ public void readObject( } } + @Test + public void logsWarning_whenReceivingMoreBytesThanRequested() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + ReadObjectRequest reqWithLimit = + ReadObjectRequest.newBuilder() + .setObject(objectName) + .setReadOffset(0) + .setReadLimit(10) + .build(); + + StorageGrpc.StorageImplBase fakeStorage = + new StorageGrpc.StorageImplBase() { + @Override + public void readObject( + ReadObjectRequest request, StreamObserver responseObserver) { + responseObserver.onNext(resp1); // sends 10 bytes + responseObserver.onNext(resp2); // sends another 10 bytes (total 20 > limit 10) + responseObserver.onCompleted(); + } + }; + + java.util.logging.Logger logger = + java.util.logging.Logger.getLogger(GapicUnbufferedReadableByteChannel.class.getName()); + java.util.List records = new java.util.ArrayList<>(); + java.util.logging.Handler handler = + new java.util.logging.Handler() { + @Override + public void publish(java.util.logging.LogRecord record) { + records.add(record); + } + + @Override + public void flush() {} + + @Override + public void close() throws SecurityException {} + }; + logger.addHandler(handler); + + try (FakeServer server = FakeServer.of(fakeStorage); + StorageClient storageClient = StorageClient.create(server.storageSettings())) { + Retrier retrier = TestUtils.retrierFromStorageOptions(server.getGrpcStorageOptions()); + + UnbufferedReadableByteChannelSession session = + new UnbufferedReadSession<>( + ApiFutures.immediateFuture(reqWithLimit), + (start, resultFuture) -> + new GapicUnbufferedReadableByteChannel( + resultFuture, + new ZeroCopyServerStreamingCallable<>( + storageClient.readObjectCallable(), + ResponseContentLifecycleManager.noop()), + start, + Hasher.noop(), + retrier, + retryOnly(DataLossException.class))); + byte[] actualBytes = new byte[15]; + try (UnbufferedReadableByteChannel c = session.open()) { + c.read(ByteBuffer.wrap(actualBytes)); + } + + boolean warningLogged = + records.stream().anyMatch(r -> r.getMessage().contains("more bytes than requested")); + assertThat(warningLogged).isTrue(); + } finally { + logger.removeHandler(handler); + } + } + private static ResultRetryAlgorithm retryOnly(Class c) { return new BasicResultRetryAlgorithm() { @Override