From 576589ab963c2ac7c597fbc4a4f27252f70562da Mon Sep 17 00:00:00 2001 From: Eunjin Song Date: Fri, 12 Jun 2026 16:08:26 -0700 Subject: [PATCH 1/2] feat(java): expose ArrowArrayStream export on LanceScanner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add public LanceScanner#exportArrowStream(ArrowArrayStream) wrapping the existing private native openStream(long) call. Lets callers populate a stream they allocated themselves (typically from their own BufferAllocator) instead of going through scanBatches(), which immediately imports into a Java ArrowReader backed by Lance's allocator. The motivation is consumers loaded under a different classloader / pinned to a different Apache Arrow version. Sharing org.apache.arrow.vector.* classes across classloader boundaries is not safe, but the C Data Interface struct is stable across Arrow versions — so handing the C struct's memory address through is the only correct integration boundary. A concrete consumer is the ongoing gluten-spark/Velox integration tracked at apache/gluten#12263, which needs to import Lance scan output into its own Arrow 15 + Velox runtime; gluten-spark is built against Arrow 15 while Lance is on Arrow 18. Test exercises the full path end-to-end: caller allocates a stream from its own RootAllocator, scanner fills the C struct, caller imports into an ArrowReader and validates batch contents. --- .../main/java/org/lance/ipc/LanceScanner.java | 38 +++++++++++++++ java/src/test/java/org/lance/ScannerTest.java | 47 +++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index 3a413e0ccfd..a6db09f2ea8 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -146,6 +146,44 @@ public ArrowReader scanBatches() { } } + /** + * Populate a caller-provided {@link ArrowArrayStream} with this scan's results, using the C Data + * Interface release callback to return ownership. + * + *

Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created: the caller + * supplies a stream that they allocated (typically from their own {@link + * org.apache.arrow.memory.BufferAllocator}), and Lance writes the C struct directly into it. This + * lets a downstream consumer drive the read loop with their own Arrow runtime, which is required + * when the caller and Lance are loaded by different classloaders / different Arrow versions. + * + *

The caller owns the stream and is responsible for closing it. The release callback installed + * on the C struct routes back through Lance's native side. + * + *

Example: + * + *

{@code
+   * try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(callerAllocator)) {
+   *   scanner.exportArrowStream(stream);
+   *   try (ArrowReader reader = Data.importArrayStream(callerAllocator, stream)) {
+   *     while (reader.loadNextBatch()) {
+   *       VectorSchemaRoot batch = reader.getVectorSchemaRoot();
+   *       // ...
+   *     }
+   *   }
+   * }
+   * }
+ * + * @param stream the caller-allocated stream to populate + * @throws IOException if the native scan fails to start + */ + public void exportArrowStream(ArrowArrayStream stream) throws IOException { + Preconditions.checkNotNull(stream); + try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { + Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed"); + openStream(stream.memoryAddress()); + } + } + private native void openStream(long streamAddress) throws IOException; @Override diff --git a/java/src/test/java/org/lance/ScannerTest.java b/java/src/test/java/org/lance/ScannerTest.java index 00434034b64..82452385e1a 100644 --- a/java/src/test/java/org/lance/ScannerTest.java +++ b/java/src/test/java/org/lance/ScannerTest.java @@ -22,6 +22,8 @@ import org.lance.ipc.ScanOptions; import org.lance.ipc.ScanStats; +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; import org.apache.arrow.dataset.scanner.Scanner; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -158,6 +160,51 @@ void testDatasetScannerSchema(@TempDir Path tempDir) throws Exception { } } + @Test + void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("dataset_scanner_export_stream").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + int totalRows = 40; + int batchRows = 20; + try (Dataset dataset = testDataset.write(1, totalRows)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(batchRows) + .columns(Arrays.asList("id")) + .build())) { + // Caller allocates the C stream from their own allocator; the scanner only fills the + // C struct. This is the path callers loaded by a different classloader use to avoid + // sharing Java Arrow vector classes with Lance. + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream); + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + int index = 0; + while (reader.loadNextBatch()) { + List fieldVectors = root.getFieldVectors(); + assertEquals(1, fieldVectors.size()); + FieldVector fieldVector = fieldVectors.get(0); + assertEquals( + ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); + assertEquals(batchRows, fieldVector.getValueCount()); + IntVector vector = (IntVector) fieldVector; + for (int i = 0; i < batchRows; i++) { + assertEquals(index, vector.get(i)); + index++; + } + } + assertEquals(totalRows, index); + } + } + } + } + } + } + @Test void testDatasetScannerCountRows(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("dataset_scanner_count").toString(); From f725d96f4499626dcbbb54ee472ef21d1a473795 Mon Sep 17 00:00:00 2001 From: Eunjin Song Date: Fri, 19 Jun 2026 18:22:18 -0700 Subject: [PATCH 2/2] refactor(java): address review on exportArrowStream Review feedback from @Xuanwo on #7259: 1. Take a raw `long streamAddress` instead of a typed `ArrowArrayStream`. A typed parameter is `org.apache.arrow.c.ArrowArrayStream` loaded by Lance's classloader / Arrow version. The whole point of the method is to serve callers on a *different* Arrow version or classloader (Spark + a native engine bundling its own Arrow), who cannot construct that exact type and would fail at the boundary the method exists to cross. The C Data Interface ABI is version-stable, so only the address crosses into Lance and the two sides stay decoupled. 2. Reject an already-populated stream. `openStream` writes the C struct in place with `ptr::write_unaligned`, which runs no destructor on the prior contents; if the caller's stream already had a `release` callback, overwriting it would leak the first producer. The native side now reads only the `release` field (unaligned, via `addr_of!`, no reference formed into the possibly-misaligned ArrowBuf) and rejects with IllegalArgumentException when it is non-null. Also rejects a null address. 3. Tests no longer assume an exact per-batch row count (batch size is a scanner hint, not a guarantee). The shared drain helper asserts only that no batch exceeds the requested size and that the full row set is correct. Expanded test coverage in ScannerTest: - basic export with batch-size-agnostic assertions - multiple fragments (4 fragments, batch size not a divisor of fragment size) - filter pushdown (id < 20) - limit + offset - projection (single column, schema asserted) - empty result - rejects exporting into an already-populated stream (first producer stays intact and drainable) - rejects null address - rejects a closed scanner --- java/lance-jni/src/blocking_scanner.rs | 31 ++ .../main/java/org/lance/ipc/LanceScanner.java | 44 ++- java/src/test/java/org/lance/ScannerTest.java | 268 ++++++++++++++++-- 3 files changed, 309 insertions(+), 34 deletions(-) diff --git a/java/lance-jni/src/blocking_scanner.rs b/java/lance-jni/src/blocking_scanner.rs index 335cb2a4fa3..c94b51ebe7e 100644 --- a/java/lance-jni/src/blocking_scanner.rs +++ b/java/lance-jni/src/blocking_scanner.rs @@ -589,6 +589,37 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_openStream( } fn inner_open_stream(env: &mut JNIEnv, j_scanner: JObject, stream_addr: jlong) -> Result<()> { + if stream_addr == 0 { + return Err(Error::input_error( + "ArrowArrayStream address must not be null".to_string(), + )); + } + + // Reject a stream that already holds a producer. We write the C struct in place below with + // `ptr::write_unaligned`, which does not run any destructor on the previous contents. If the + // caller passed a stream whose `release` callback is already set (e.g. it was populated by an + // earlier export and not yet released), overwriting it would drop that callback and leak the + // first producer's resources. A freshly-allocated `ArrowArrayStream` has a null `release`, per + // the Arrow C Data Interface, so requiring `release == None` is the contract for "empty". + // + // The struct is allocated by Arrow Java inside an ArrowBuf and is not guaranteed to be aligned + // (hence `write_unaligned` below), so we must not form a reference to it. We read only the + // `release` field through an unaligned read: `addr_of!` computes the field address without + // creating an intermediate (mis)aligned reference, and the field is an `Option` which is + // `Copy` with no destructor, so reading a copy of it leaves the caller's stream untouched. + let release_is_set = unsafe { + let stream_ptr = stream_addr as *const FFI_ArrowArrayStream; + let release = std::ptr::read_unaligned(std::ptr::addr_of!((*stream_ptr).release)); + release.is_some() + }; + if release_is_set { + return Err(Error::input_error( + "ArrowArrayStream is already populated; exporting into it would leak the existing \ + producer. Pass a freshly-allocated, empty stream." + .to_string(), + )); + } + let record_batch_stream = { let scanner_guard = unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }?; diff --git a/java/src/main/java/org/lance/ipc/LanceScanner.java b/java/src/main/java/org/lance/ipc/LanceScanner.java index a6db09f2ea8..15b29557719 100644 --- a/java/src/main/java/org/lance/ipc/LanceScanner.java +++ b/java/src/main/java/org/lance/ipc/LanceScanner.java @@ -147,23 +147,36 @@ public ArrowReader scanBatches() { } /** - * Populate a caller-provided {@link ArrowArrayStream} with this scan's results, using the C Data - * Interface release callback to return ownership. + * Export this scan's results into a caller-owned Arrow C stream identified by its memory address, + * using the Arrow C Data Interface release callback to transfer ownership. * - *

Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created: the caller - * supplies a stream that they allocated (typically from their own {@link - * org.apache.arrow.memory.BufferAllocator}), and Lance writes the C struct directly into it. This - * lets a downstream consumer drive the read loop with their own Arrow runtime, which is required - * when the caller and Lance are loaded by different classloaders / different Arrow versions. + *

This method intentionally takes a raw {@code streamAddress} (an {@code ArrowArrayStream} + * memory address) rather than a Java {@link ArrowArrayStream} object. A typed parameter would be + * an {@code org.apache.arrow.c.ArrowArrayStream} loaded by Lance's classloader / Arrow + * version; a caller running a different Arrow version (or under a different classloader, e.g. + * Spark + a native engine bundling its own Arrow) cannot construct that exact type and would hit + * a {@code ClassCastException}/{@code NoSuchMethodError} at the very boundary this method exists + * to cross. The C Data Interface ABI is stable across Arrow versions, so passing the C struct's + * address keeps the two sides fully decoupled: the caller allocates the stream with its + * own Arrow runtime and only the {@code long} address crosses into Lance. See gluten#12263 + * for the cross-Arrow-version integration that motivated this. * - *

The caller owns the stream and is responsible for closing it. The release callback installed - * on the C struct routes back through Lance's native side. + *

Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created on Lance's side: + * Lance writes the C struct directly at {@code streamAddress} and the caller drives the read loop + * with its own Arrow runtime. * - *

Example: + *

The {@code streamAddress} must point to a freshly-allocated, empty {@code ArrowArrayStream} + * (its {@code release} callback must be null). Exporting into a stream that already holds a + * producer is rejected with an {@link IllegalArgumentException}, because overwriting the struct + * would drop the existing {@code release} callback and leak the first producer. The caller owns + * the stream and is responsible for closing it; the release callback installed by this call + * routes back through Lance's native side. + * + *

Example (caller on its own Arrow version / allocator): * *

{@code
    * try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(callerAllocator)) {
-   *   scanner.exportArrowStream(stream);
+   *   scanner.exportArrowStream(stream.memoryAddress());
    *   try (ArrowReader reader = Data.importArrayStream(callerAllocator, stream)) {
    *     while (reader.loadNextBatch()) {
    *       VectorSchemaRoot batch = reader.getVectorSchemaRoot();
@@ -173,14 +186,15 @@ public ArrowReader scanBatches() {
    * }
    * }
* - * @param stream the caller-allocated stream to populate + * @param streamAddress the memory address of a freshly-allocated, empty {@code ArrowArrayStream} + * to populate + * @throws IllegalArgumentException if the scanner is closed or the stream is already populated * @throws IOException if the native scan fails to start */ - public void exportArrowStream(ArrowArrayStream stream) throws IOException { - Preconditions.checkNotNull(stream); + public void exportArrowStream(long streamAddress) throws IOException { try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed"); - openStream(stream.memoryAddress()); + openStream(streamAddress); } } diff --git a/java/src/test/java/org/lance/ScannerTest.java b/java/src/test/java/org/lance/ScannerTest.java index 82452385e1a..fa16c482189 100644 --- a/java/src/test/java/org/lance/ScannerTest.java +++ b/java/src/test/java/org/lance/ScannerTest.java @@ -51,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class ScannerTest { @@ -160,9 +161,41 @@ void testDatasetScannerSchema(@TempDir Path tempDir) throws Exception { } } + /** + * Reads every batch from a caller-owned C stream populated by {@link + * LanceScanner#exportArrowStream(long)} and returns the {@code id} values in stream order. + * + *

Asserts the projected schema is exactly {@code id: int32} and that no batch exceeds {@code + * maxBatchRows}, but does not assume any particular batch count or that batches are full — batch + * size is a scanner hint, not a guarantee, so over-asserting on it makes the test brittle. + */ + private static List drainIdStream( + BufferAllocator allocator, ArrowArrayStream stream, int maxBatchRows) throws IOException { + List ids = new ArrayList<>(); + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + while (reader.loadNextBatch()) { + List fieldVectors = root.getFieldVectors(); + assertEquals(1, fieldVectors.size()); + FieldVector fieldVector = fieldVectors.get(0); + assertEquals("id", fieldVector.getField().getName()); + assertEquals(ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); + int rowsInBatch = fieldVector.getValueCount(); + assertTrue( + rowsInBatch <= maxBatchRows, + "batch of " + rowsInBatch + " rows exceeded requested batch size " + maxBatchRows); + IntVector vector = (IntVector) fieldVector; + for (int i = 0; i < rowsInBatch; i++) { + ids.add(vector.get(i)); + } + } + } + return ids; + } + @Test - void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception { - String datasetPath = tempDir.resolve("dataset_scanner_export_stream").toString(); + void testExportArrowStream(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_basic").toString(); try (BufferAllocator allocator = new RootAllocator()) { TestUtils.SimpleTestDataset testDataset = new TestUtils.SimpleTestDataset(allocator, datasetPath); @@ -176,28 +209,133 @@ void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception .batchSize(batchRows) .columns(Arrays.asList("id")) .build())) { - // Caller allocates the C stream from their own allocator; the scanner only fills the - // C struct. This is the path callers loaded by a different classloader use to avoid - // sharing Java Arrow vector classes with Lance. + // The caller allocates the C stream from its own allocator and passes only the memory + // address; the scanner fills the C struct in place. This is the cross-Arrow-version / + // cross-classloader boundary the API exists to serve. + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, batchRows); + assertEquals(totalRows, ids.size()); + for (int i = 0; i < totalRows; i++) { + assertEquals(i, ids.get(i)); + } + } + } + } + } + } + + @Test + void testExportArrowStreamMultipleFragments(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_multi_fragment").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + int totalRows = 40; + // maxRowsPerFile < totalRows forces multiple fragments (4 fragments of 10 rows). + List fragments = testDataset.createNewFragment(totalRows, 10); + assertEquals(4, fragments.size()); + FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments); + try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) { + int batchRows = 7; // deliberately not a divisor of any fragment size + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(batchRows) + .columns(Arrays.asList("id")) + .build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, batchRows); + assertEquals(totalRows, ids.size()); + Collections.sort(ids); + for (int i = 0; i < totalRows; i++) { + assertEquals(i, ids.get(i)); + } + } + } + } + } + } + + @Test + void testExportArrowStreamWithFilter(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_filter").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(50) + .columns(Arrays.asList("id")) + .filter("id < 20") + .build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, 50); + assertEquals(20, ids.size()); + Collections.sort(ids); + for (int i = 0; i < 20; i++) { + assertEquals(i, ids.get(i)); + } + } + } + } + } + } + + @Test + void testExportArrowStreamWithLimitOffset(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_limit_offset").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder() + .batchSize(50) + .columns(Arrays.asList("id")) + .limit(5) + .offset(10) + .build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, 50); + assertEquals(Arrays.asList(10, 11, 12, 13, 14), ids); + } + } + } + } + } + + @Test + void testExportArrowStreamProjectsRequestedColumnsOnly(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_projection").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 10)) { + // Project only "name"; the exported stream's schema must contain exactly that column. + try (LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("name")).build())) { try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { - scanner.exportArrowStream(stream); + scanner.exportArrowStream(stream.memoryAddress()); try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); - int index = 0; + assertEquals(1, root.getSchema().getFields().size()); + assertEquals("name", root.getSchema().getFields().get(0).getName()); + int rows = 0; while (reader.loadNextBatch()) { - List fieldVectors = root.getFieldVectors(); - assertEquals(1, fieldVectors.size()); - FieldVector fieldVector = fieldVectors.get(0); - assertEquals( - ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); - assertEquals(batchRows, fieldVector.getValueCount()); - IntVector vector = (IntVector) fieldVector; - for (int i = 0; i < batchRows; i++) { - assertEquals(index, vector.get(i)); - index++; - } + rows += root.getRowCount(); } - assertEquals(totalRows, index); + assertEquals(10, rows); } } } @@ -205,6 +343,98 @@ void testDatasetScannerExportArrowStream(@TempDir Path tempDir) throws Exception } } + @Test + void testExportArrowStreamEmptyResult(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_empty").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan( + new ScanOptions.Builder().columns(Arrays.asList("id")).filter("id < 0").build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + scanner.exportArrowStream(stream.memoryAddress()); + List ids = drainIdStream(allocator, stream, 1024); + assertTrue(ids.isEmpty()); + } + } + } + } + } + + @Test + void testExportArrowStreamRejectsPopulatedStream(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_reject_populated").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 40)) { + try (LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build())) { + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + // First export populates the stream and installs a release callback. + scanner.exportArrowStream(stream.memoryAddress()); + // Exporting again into the same (already-populated) stream must be rejected rather + // than silently overwriting and leaking the first producer's release callback. + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> scanner.exportArrowStream(stream.memoryAddress())); + assertTrue(ex.getMessage().toLowerCase().contains("already populated")); + // The first producer is still intact and drainable. + try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { + int rows = 0; + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + while (reader.loadNextBatch()) { + rows += root.getRowCount(); + } + assertEquals(40, rows); + } + } + } + } + } + } + + @Test + void testExportArrowStreamRejectsNullAddress(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_reject_null").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 10)) { + try (LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build())) { + assertThrows(IllegalArgumentException.class, () -> scanner.exportArrowStream(0L)); + } + } + } + } + + @Test + void testExportArrowStreamRejectsClosedScanner(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("export_stream_reject_closed").toString(); + try (BufferAllocator allocator = new RootAllocator()) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + try (Dataset dataset = testDataset.write(1, 10)) { + LanceScanner scanner = + dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build()); + scanner.close(); + try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) { + assertThrows( + IllegalArgumentException.class, + () -> scanner.exportArrowStream(stream.memoryAddress())); + } + } + } + } + @Test void testDatasetScannerCountRows(@TempDir Path tempDir) throws Exception { String datasetPath = tempDir.resolve("dataset_scanner_count").toString();