Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions java/lance-jni/src/blocking_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,37 @@ pub extern "system" fn Java_org_lance_ipc_LanceScanner_openStream(
}

fn inner_open_stream(env: &mut JNIEnv, j_scanner: JObject, stream_addr: jlong) -> Result<()> {
if stream_addr == 0 {
return Err(Error::input_error(
"ArrowArrayStream address must not be null".to_string(),
));
}

// Reject a stream that already holds a producer. We write the C struct in place below with
// `ptr::write_unaligned`, which does not run any destructor on the previous contents. If the
// caller passed a stream whose `release` callback is already set (e.g. it was populated by an
// earlier export and not yet released), overwriting it would drop that callback and leak the
// first producer's resources. A freshly-allocated `ArrowArrayStream` has a null `release`, per
// the Arrow C Data Interface, so requiring `release == None` is the contract for "empty".
//
// The struct is allocated by Arrow Java inside an ArrowBuf and is not guaranteed to be aligned
// (hence `write_unaligned` below), so we must not form a reference to it. We read only the
// `release` field through an unaligned read: `addr_of!` computes the field address without
// creating an intermediate (mis)aligned reference, and the field is an `Option<fn>` which is
// `Copy` with no destructor, so reading a copy of it leaves the caller's stream untouched.
let release_is_set = unsafe {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The populated-stream guard checks the release callback before constructing and writing the new stream, so two concurrent exports to the same empty stream can both pass the check and then overwrite each other. That can leak the first producer and leave the caller draining whichever stream won the race.

let stream_ptr = stream_addr as *const FFI_ArrowArrayStream;
let release = std::ptr::read_unaligned(std::ptr::addr_of!((*stream_ptr).release));
release.is_some()
};
if release_is_set {
return Err(Error::input_error(
"ArrowArrayStream is already populated; exporting into it would leak the existing \
producer. Pass a freshly-allocated, empty stream."
.to_string(),
));
}

let record_batch_stream = {
let scanner_guard =
unsafe { env.get_rust_field::<_, _, BlockingScanner>(j_scanner, NATIVE_SCANNER) }?;
Expand Down
52 changes: 52 additions & 0 deletions java/src/main/java/org/lance/ipc/LanceScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,58 @@ public ArrowReader scanBatches() {
}
}

/**
* Export this scan's results into a caller-owned Arrow C stream identified by its memory address,
* using the Arrow C Data Interface release callback to transfer ownership.
*
* <p>This method intentionally takes a raw {@code streamAddress} (an {@code ArrowArrayStream}
* memory address) rather than a Java {@link ArrowArrayStream} object. A typed parameter would be
* an {@code org.apache.arrow.c.ArrowArrayStream} loaded by <em>Lance's</em> classloader / Arrow
* version; a caller running a different Arrow version (or under a different classloader, e.g.
* Spark + a native engine bundling its own Arrow) cannot construct that exact type and would hit
* a {@code ClassCastException}/{@code NoSuchMethodError} at the very boundary this method exists
* to cross. The C Data Interface ABI is stable across Arrow versions, so passing the C struct's
* address keeps the two sides fully decoupled: the caller allocates the stream with <em>its
* own</em> Arrow runtime and only the {@code long} address crosses into Lance. See gluten#12263
* for the cross-Arrow-version integration that motivated this.
*
* <p>Unlike {@link #scanBatches()}, no Java Arrow {@link ArrowReader} is created on Lance's side:
* Lance writes the C struct directly at {@code streamAddress} and the caller drives the read loop
* with its own Arrow runtime.
*
* <p>The {@code streamAddress} must point to a freshly-allocated, empty {@code ArrowArrayStream}
* (its {@code release} callback must be null). Exporting into a stream that already holds a
* producer is rejected with an {@link IllegalArgumentException}, because overwriting the struct
* would drop the existing {@code release} callback and leak the first producer. The caller owns
* the stream and is responsible for closing it; the release callback installed by this call
* routes back through Lance's native side.
*
* <p>Example (caller on its own Arrow version / allocator):
*
* <pre>{@code
* try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(callerAllocator)) {
* scanner.exportArrowStream(stream.memoryAddress());
* try (ArrowReader reader = Data.importArrayStream(callerAllocator, stream)) {
* while (reader.loadNextBatch()) {
* VectorSchemaRoot batch = reader.getVectorSchemaRoot();
* // ...
* }
* }
* }
* }</pre>
*
* @param streamAddress the memory address of a freshly-allocated, empty {@code ArrowArrayStream}
* to populate
* @throws IllegalArgumentException if the scanner is closed or the stream is already populated
* @throws IOException if the native scan fails to start
*/
public void exportArrowStream(long streamAddress) throws IOException {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeScannerHandle != 0, "Scanner is closed");
openStream(streamAddress);
}
}

private native void openStream(long streamAddress) throws IOException;

@Override
Expand Down
277 changes: 277 additions & 0 deletions java/src/test/java/org/lance/ScannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.lance.ipc.ScanOptions;
import org.lance.ipc.ScanStats;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -49,6 +51,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ScannerTest {
Expand Down Expand Up @@ -158,6 +161,280 @@ void testDatasetScannerSchema(@TempDir Path tempDir) throws Exception {
}
}

/**
* Reads every batch from a caller-owned C stream populated by {@link
* LanceScanner#exportArrowStream(long)} and returns the {@code id} values in stream order.
*
* <p>Asserts the projected schema is exactly {@code id: int32} and that no batch exceeds {@code
* maxBatchRows}, but does not assume any particular batch count or that batches are full — batch
* size is a scanner hint, not a guarantee, so over-asserting on it makes the test brittle.
*/
private static List<Integer> drainIdStream(
BufferAllocator allocator, ArrowArrayStream stream, int maxBatchRows) throws IOException {
List<Integer> ids = new ArrayList<>();
try (ArrowReader reader = Data.importArrayStream(allocator, stream)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
while (reader.loadNextBatch()) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema assertions only run after a batch is loaded, so the empty-result case never validates the exported stream schema. A regression that returns the wrong schema for zero-row scans would still pass.

List<FieldVector> fieldVectors = root.getFieldVectors();
assertEquals(1, fieldVectors.size());
FieldVector fieldVector = fieldVectors.get(0);
assertEquals("id", fieldVector.getField().getName());
assertEquals(ArrowType.ArrowTypeID.Int, fieldVector.getField().getType().getTypeID());
int rowsInBatch = fieldVector.getValueCount();
assertTrue(
rowsInBatch <= maxBatchRows,
"batch of " + rowsInBatch + " rows exceeded requested batch size " + maxBatchRows);
IntVector vector = (IntVector) fieldVector;
for (int i = 0; i < rowsInBatch; i++) {
ids.add(vector.get(i));
}
}
}
return ids;
}

@Test
void testExportArrowStream(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_basic").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int totalRows = 40;
int batchRows = 20;
try (Dataset dataset = testDataset.write(1, totalRows)) {
try (LanceScanner scanner =
dataset.newScan(
new ScanOptions.Builder()
.batchSize(batchRows)
.columns(Arrays.asList("id"))
.build())) {
// The caller allocates the C stream from its own allocator and passes only the memory
// address; the scanner fills the C struct in place. This is the cross-Arrow-version /
// cross-classloader boundary the API exists to serve.
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
scanner.exportArrowStream(stream.memoryAddress());
List<Integer> ids = drainIdStream(allocator, stream, batchRows);
assertEquals(totalRows, ids.size());
for (int i = 0; i < totalRows; i++) {
assertEquals(i, ids.get(i));
}
}
}
}
}
}

@Test
void testExportArrowStreamMultipleFragments(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_multi_fragment").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
int totalRows = 40;
// maxRowsPerFile < totalRows forces multiple fragments (4 fragments of 10 rows).
List<FragmentMetadata> fragments = testDataset.createNewFragment(totalRows, 10);
assertEquals(4, fragments.size());
FragmentOperation.Append appendOp = new FragmentOperation.Append(fragments);
try (Dataset dataset = Dataset.commit(allocator, datasetPath, appendOp, Optional.of(1L))) {
int batchRows = 7; // deliberately not a divisor of any fragment size
try (LanceScanner scanner =
dataset.newScan(
new ScanOptions.Builder()
.batchSize(batchRows)
.columns(Arrays.asList("id"))
.build())) {
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
scanner.exportArrowStream(stream.memoryAddress());
List<Integer> ids = drainIdStream(allocator, stream, batchRows);
assertEquals(totalRows, ids.size());
Collections.sort(ids);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-fragment export test sorts the collected ids before asserting them, so it would pass even if the default ordered scan returned fragments out of order.

for (int i = 0; i < totalRows; i++) {
assertEquals(i, ids.get(i));
}
}
}
}
}
}

@Test
void testExportArrowStreamWithFilter(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_filter").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
try (Dataset dataset = testDataset.write(1, 40)) {
try (LanceScanner scanner =
dataset.newScan(
new ScanOptions.Builder()
.batchSize(50)
.columns(Arrays.asList("id"))
.filter("id < 20")
.build())) {
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
scanner.exportArrowStream(stream.memoryAddress());
List<Integer> ids = drainIdStream(allocator, stream, 50);
assertEquals(20, ids.size());
Collections.sort(ids);
for (int i = 0; i < 20; i++) {
assertEquals(i, ids.get(i));
}
}
}
}
}
}

@Test
void testExportArrowStreamWithLimitOffset(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_limit_offset").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
try (Dataset dataset = testDataset.write(1, 40)) {
try (LanceScanner scanner =
dataset.newScan(
new ScanOptions.Builder()
.batchSize(50)
.columns(Arrays.asList("id"))
.limit(5)
.offset(10)
.build())) {
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
scanner.exportArrowStream(stream.memoryAddress());
List<Integer> ids = drainIdStream(allocator, stream, 50);
assertEquals(Arrays.asList(10, 11, 12, 13, 14), ids);
}
}
}
}
}

@Test
void testExportArrowStreamProjectsRequestedColumnsOnly(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_projection").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
try (Dataset dataset = testDataset.write(1, 10)) {
// Project only "name"; the exported stream's schema must contain exactly that column.
try (LanceScanner scanner =
dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("name")).build())) {
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
scanner.exportArrowStream(stream.memoryAddress());
try (ArrowReader reader = Data.importArrayStream(allocator, stream)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertEquals(1, root.getSchema().getFields().size());
assertEquals("name", root.getSchema().getFields().get(0).getName());
int rows = 0;
while (reader.loadNextBatch()) {
rows += root.getRowCount();
}
assertEquals(10, rows);
}
}
}
}
}
}

@Test
void testExportArrowStreamEmptyResult(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_empty").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
try (Dataset dataset = testDataset.write(1, 40)) {
try (LanceScanner scanner =
dataset.newScan(
new ScanOptions.Builder().columns(Arrays.asList("id")).filter("id < 0").build())) {
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
scanner.exportArrowStream(stream.memoryAddress());
List<Integer> ids = drainIdStream(allocator, stream, 1024);
assertTrue(ids.isEmpty());
}
}
}
}
}

@Test
void testExportArrowStreamRejectsPopulatedStream(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_reject_populated").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
try (Dataset dataset = testDataset.write(1, 40)) {
try (LanceScanner scanner =
dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build())) {
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
// First export populates the stream and installs a release callback.
scanner.exportArrowStream(stream.memoryAddress());
// Exporting again into the same (already-populated) stream must be rejected rather
// than silently overwriting and leaking the first producer's release callback.
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
() -> scanner.exportArrowStream(stream.memoryAddress()));
assertTrue(ex.getMessage().toLowerCase().contains("already populated"));
// The first producer is still intact and drainable.
try (ArrowReader reader = Data.importArrayStream(allocator, stream)) {
int rows = 0;
VectorSchemaRoot root = reader.getVectorSchemaRoot();
while (reader.loadNextBatch()) {
rows += root.getRowCount();
}
assertEquals(40, rows);
}
}
}
}
}
}

@Test
void testExportArrowStreamRejectsNullAddress(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_reject_null").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
try (Dataset dataset = testDataset.write(1, 10)) {
try (LanceScanner scanner =
dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build())) {
assertThrows(IllegalArgumentException.class, () -> scanner.exportArrowStream(0L));
}
}
}
}

@Test
void testExportArrowStreamRejectsClosedScanner(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("export_stream_reject_closed").toString();
try (BufferAllocator allocator = new RootAllocator()) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
try (Dataset dataset = testDataset.write(1, 10)) {
LanceScanner scanner =
dataset.newScan(new ScanOptions.Builder().columns(Arrays.asList("id")).build());
scanner.close();
try (ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
assertThrows(
IllegalArgumentException.class,
() -> scanner.exportArrowStream(stream.memoryAddress()));
}
}
}
}

@Test
void testDatasetScannerCountRows(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("dataset_scanner_count").toString();
Expand Down
Loading