feat(java): expose ArrowArrayStream export on LanceScanner#7259
feat(java): expose ArrowArrayStream export on LanceScanner#7259sezruby wants to merge 4 commits into
Conversation
Add public LanceScanner#exportArrowStream(ArrowArrayStream) wrapping the existing private native openStream(long) call. Lets callers populate a stream they allocated themselves (typically from their own BufferAllocator) instead of going through scanBatches(), which immediately imports into a Java ArrowReader backed by Lance's allocator. The motivation is consumers loaded under a different classloader / pinned to a different Apache Arrow version. Sharing org.apache.arrow.vector.* classes across classloader boundaries is not safe, but the C Data Interface struct is stable across Arrow versions — so handing the C struct's memory address through is the only correct integration boundary. A concrete consumer is the ongoing gluten-spark/Velox integration tracked at apache/gluten#12263, which needs to import Lance scan output into its own Arrow 15 + Velox runtime; gluten-spark is built against Arrow 15 while Lance is on Arrow 18. Test exercises the full path end-to-end: caller allocates a stream from its own RootAllocator, scanner fills the C struct, caller imports into an ArrowReader and validates batch contents.
|
@hamersaw @jackye1995 Could you review the PR? The PR is to support Lance Reader in Gluten/Spark. I'll open a lance-spark PR after this PR is merged. Thanks! |
|
Gentle ping — @wjones127 would you have a chance to take a look? Small Java-only change exposing the existing |
| * @param stream the caller-allocated stream to populate | ||
| * @throws IOException if the native scan fails to start | ||
| */ | ||
| public void exportArrowStream(ArrowArrayStream stream) throws IOException { |
There was a problem hiding this comment.
The public API still requires Lance's ArrowArrayStream Java type, so callers using a different Arrow version or classloader cannot pass their own stream despite this being the integration boundary the method documents.
There was a problem hiding this comment.
Good catch — this defeated the method's whole purpose. Changed the signature to take a raw long streamAddress instead of a typed ArrowArrayStream. The caller now allocates the stream with its own Arrow runtime and passes stream.memoryAddress(), so only the address crosses into Lance and the two sides never share a Java Arrow type. Documented the reasoning in the Javadoc. Fixed in f725d96.
| Preconditions.checkNotNull(stream); | ||
| try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) { | ||
| Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed"); | ||
| openStream(stream.memoryAddress()); |
There was a problem hiding this comment.
This writes into the provided C stream without rejecting an already-populated stream, so a second export can overwrite the existing release callback and leak the first producer.
There was a problem hiding this comment.
Fixed. inner_open_stream now reads the existing struct's release field and rejects with IllegalArgumentException if it's already set (also rejects a null address). A freshly-allocated ArrowArrayStream has a null release per the C Data Interface, so that's the "empty" contract. The read is done unaligned via addr_of! without forming a reference, since the struct lives in a possibly-unaligned ArrowBuf (same reason the write below uses write_unaligned). Added a test that exports twice into the same stream and asserts the second call throws while the first producer stays intact and drainable. Fixed in f725d96.
| FieldVector fieldVector = fieldVectors.get(0); | ||
| assertEquals( | ||
| ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID()); | ||
| assertEquals(batchRows, fieldVector.getValueCount()); |
There was a problem hiding this comment.
This test assumes every non-strict scan batch has exactly the requested size, so a valid scanner change that returns smaller batches would fail this API test.
There was a problem hiding this comment.
Agreed, that was brittle. Reworked the assertions into a shared drainIdStream helper that asserts only batch size <= requested size and that the full row set is correct, without assuming any batch count or that batches are full. Applied across all the new cases. Fixed in f725d96.
Review feedback from @Xuanwo on lance-format#7259: 1. Take a raw `long streamAddress` instead of a typed `ArrowArrayStream`. A typed parameter is `org.apache.arrow.c.ArrowArrayStream` loaded by Lance's classloader / Arrow version. The whole point of the method is to serve callers on a *different* Arrow version or classloader (Spark + a native engine bundling its own Arrow), who cannot construct that exact type and would fail at the boundary the method exists to cross. The C Data Interface ABI is version-stable, so only the address crosses into Lance and the two sides stay decoupled. 2. Reject an already-populated stream. `openStream` writes the C struct in place with `ptr::write_unaligned`, which runs no destructor on the prior contents; if the caller's stream already had a `release` callback, overwriting it would leak the first producer. The native side now reads only the `release` field (unaligned, via `addr_of!`, no reference formed into the possibly-misaligned ArrowBuf) and rejects with IllegalArgumentException when it is non-null. Also rejects a null address. 3. Tests no longer assume an exact per-batch row count (batch size is a scanner hint, not a guarantee). The shared drain helper asserts only that no batch exceeds the requested size and that the full row set is correct. Expanded test coverage in ScannerTest: - basic export with batch-size-agnostic assertions - multiple fragments (4 fragments, batch size not a divisor of fragment size) - filter pushdown (id < 20) - limit + offset - projection (single column, schema asserted) - empty result - rejects exporting into an already-populated stream (first producer stays intact and drainable) - rejects null address - rejects a closed scanner
| // `release` field through an unaligned read: `addr_of!` computes the field address without | ||
| // creating an intermediate (mis)aligned reference, and the field is an `Option<fn>` which is | ||
| // `Copy` with no destructor, so reading a copy of it leaves the caller's stream untouched. | ||
| let release_is_set = unsafe { |
There was a problem hiding this comment.
The populated-stream guard checks the release callback before constructing and writing the new stream, so two concurrent exports to the same empty stream can both pass the check and then overwrite each other. That can leak the first producer and leave the caller draining whichever stream won the race.
| List<Integer> ids = new ArrayList<>(); | ||
| try (ArrowReader reader = Data.importArrayStream(allocator, stream)) { | ||
| VectorSchemaRoot root = reader.getVectorSchemaRoot(); | ||
| while (reader.loadNextBatch()) { |
There was a problem hiding this comment.
The schema assertions only run after a batch is loaded, so the empty-result case never validates the exported stream schema. A regression that returns the wrong schema for zero-row scans would still pass.
| scanner.exportArrowStream(stream.memoryAddress()); | ||
| List<Integer> ids = drainIdStream(allocator, stream, batchRows); | ||
| assertEquals(totalRows, ids.size()); | ||
| Collections.sort(ids); |
There was a problem hiding this comment.
The multi-fragment export test sorts the collected ids before asserting them, so it would pass even if the default ordered scan returned fragments out of order.
Summary
Add
LanceScanner#exportArrowStream(ArrowArrayStream)— a public wrapper around the existing private nativeopenStream(long)JNI call. Lets callers populate a stream they allocated themselves instead of going throughscanBatches(), which immediately imports the result into a JavaArrowReaderbacked by Lance'sBufferAllocator.Why
Consumers loaded under a different classloader and/or pinned to a different Apache Arrow version cannot safely share
org.apache.arrow.vector.*classes with Lance — the JVM treats them as distinct types even when the bytecode is identical. The C Data Interface struct is stable across Arrow versions, so handing the C struct's memory address across the boundary is the only correct integration shape.A concrete consumer is the gluten-spark / Velox integration tracked at apache/gluten#12263. gluten-spark builds against Arrow 15 (matching what Spark 3.5 ships and Velox uses); Lance Java SDK is on Arrow 18. With this method, gluten can:
…where
glutenAllocatoris a Spark-task-managedBufferAllocator(ArrowReservationListenerplumbing for memory accounting). Lance never sees Java Arrow on this side; ownership stays with the caller via the C Data Interface release callback.What changed
LanceScanner#exportArrowStream(ArrowArrayStream)— new public method, ~7 lines + Javadoc with usage example. Mirrors the body ofscanBatches()minus the local stream allocation and theData.importArrayStreamstep.testDatasetScannerExportArrowStreamexercises the full path: caller allocates the C stream from its ownRootAllocator, scanner fills the C struct, caller imports into anArrowReaderand validates batch contents (40 rows over 2 batches of 20).Backwards compatibility
Pure addition.
scanBatches(),schema(),countRows(),getStats(),close()all unchanged. No native ABI change.Test plan
./mvnw test -Dtest=ScannerTest#testDatasetScannerExportArrowStream— passes locally (Java compile + spotless clean; full test run depends on a workinglance-jniRust build, which had an unrelatedaws-smithy-typesregistry issue on my machine, so I'm relying on CI for the JNI-linked verification).testDatasetScannerColumnscovers thescanBatches()path so any regression in the sharedopenStreamJNI call would surface there.