Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 102 additions & 8 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::array::{RecordBatch, RecordBatchIterator, StructArray};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi_and_data_type};
use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream};
use arrow_schema::{DataType, Schema as ArrowSchema};
use jni::objects::{JIntArray, JValue, JValueGen};
use jni::objects::{JByteArray, JIntArray, JValue, JValueGen};
use jni::{
JNIEnv,
objects::{JClass, JLongArray, JObject, JString},
Expand All @@ -19,6 +19,8 @@ use lance_io::utils::CachedFileSize;
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::iter::once;

use roaring::RoaringBitmap;

use lance::dataset::fragment::write::FragmentCreateBuilder;
use lance::io::ObjectStoreParams;
use lance_datafusion::utils::StreamingWriteSource;
Expand Down Expand Up @@ -48,8 +50,8 @@ pub(crate) struct FragmentMergeResult {
pub(crate) struct FragmentUpdateResult {
updated_fragment: Fragment,
fields_modified: Vec<u32>,
/// Physical row offsets that received column updates (from `_rowaddr` low bits).
updated_row_offsets: Vec<i64>,
/// Matched row offsets serialized as portable RoaringBitmap bytes.
updated_row_offset_bytes: Vec<u8>,
}

//////////////////
Expand Down Expand Up @@ -539,15 +541,106 @@ fn inner_update_column<'local>(
let right_on_str: String = right_on.extract(env)?;
let r =
RT.block_on(fragment.update_columns_with_offsets(reader, &left_on_str, &right_on_str))?;
let updated_row_offsets: Vec<i64> = r.matched_offsets.iter().map(|o| o as i64).collect();
let updated_row_offset_bytes = serialize_matched_offsets(&r.matched_offsets)?;
let result = FragmentUpdateResult {
updated_fragment: r.fragment,
fields_modified: r.fields_modified,
updated_row_offsets,
updated_row_offset_bytes,
};
result.into_java(env)
}

fn serialize_matched_offsets(bitmap: &RoaringBitmap) -> Result<Vec<u8>> {
let mut buf = Vec::new();
bitmap.serialize_into(&mut buf).map_err(|e| {
Error::runtime_error(format!(
"failed to serialize matched row offsets RoaringBitmap: {e}"
))
})?;
Ok(buf)
}

fn deserialize_row_offset_bytes(bytes: &[u8]) -> Result<RoaringBitmap> {
if bytes.is_empty() {
return Ok(RoaringBitmap::new());
}
RoaringBitmap::deserialize_from(bytes).map_err(|e| {
Error::input_error(format!(
"invalid updatedRowOffsetBytes RoaringBitmap bytes: {e}"
))
})
}

fn expand_row_offset_bytes_to_i64(bitmap: &RoaringBitmap) -> Vec<i64> {
bitmap.iter().map(|o| o as i64).collect()
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_fragment_FragmentUpdateResult_expandRowOffsetsFromBytes<
'local,
>(
mut env: JNIEnv<'local>,
_cls: JClass,
jbytes: JByteArray,
) -> JLongArray<'local> {
ok_or_throw_with_return!(
env,
inner_expand_updated_row_offset_bytes(&mut env, jbytes),
unsafe { JLongArray::from_raw(std::ptr::null_mut()) }
)
}

fn inner_expand_updated_row_offset_bytes<'local>(
env: &mut JNIEnv<'local>,
jbytes: JByteArray,
) -> Result<JLongArray<'local>> {
let buf = env.convert_byte_array(&jbytes)?;
let bitmap = deserialize_row_offset_bytes(&buf)?;
let offsets = expand_row_offset_bytes_to_i64(&bitmap);
let arr = env.new_long_array(offsets.len() as i32)?;
if !offsets.is_empty() {
env.set_long_array_region(&arr, 0, &offsets)?;
}
Ok(arr)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_fragment_FragmentUpdateResult_encodeRowOffsetsToBytes<
'local,
>(
mut env: JNIEnv<'local>,
_cls: JClass,
joffsets: JLongArray,
) -> JByteArray<'local> {
ok_or_throw_with_return!(
env,
inner_encode_updated_row_offset_bytes(&mut env, joffsets),
unsafe { JByteArray::from_raw(std::ptr::null_mut()) }
)
}

fn inner_encode_updated_row_offset_bytes<'local>(
env: &mut JNIEnv<'local>,
joffsets: JLongArray,
) -> Result<JByteArray<'local>> {
let len = env.get_array_length(&joffsets)?;
let mut buf: Vec<i64> = vec![0; len as usize];
if len > 0 {
env.get_long_array_region(&joffsets, 0, buf.as_mut_slice())?;
}
let mut bitmap = RoaringBitmap::new();
for offset in buf {
if offset < 0 {
return Err(Error::input_error(format!(
"updatedRowOffsets must be non-negative, got {offset}"
)));
}
bitmap.insert(offset as u32);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deprecated long[] encoder casts every non-negative value to u32, so offsets above u32::MAX silently become different rows and can corrupt last_updated metadata.

}
let bytes = serialize_matched_offsets(&bitmap)?;
Ok(env.byte_array_from_slice(&bytes)?)
}

#[unsafe(no_mangle)]
pub extern "system" fn Java_org_lance_fragment_RowIdMeta_nativeEncodeRowIds(
mut env: JNIEnv,
Expand Down Expand Up @@ -591,7 +684,7 @@ const FRAGMENT_MERGE_RESULT_CLASS: &str = "org/lance/fragment/FragmentMergeResul
const FRAGMENT_MERGE_RESULT_CONSTRUCTOR_SIG: &str =
"(Lorg/lance/FragmentMetadata;Lorg/lance/schema/LanceSchema;)V";
const FRAGMENT_UPDATE_RESULT_CLASS: &str = "org/lance/fragment/FragmentUpdateResult";
const FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;[J[J)V";
const FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG: &str = "(Lorg/lance/FragmentMetadata;[J[B)V";

impl IntoJava for &FragmentMergeResult {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
Expand All @@ -612,14 +705,15 @@ impl IntoJava for &FragmentUpdateResult {
fn into_java<'a>(self, env: &mut JNIEnv<'a>) -> Result<JObject<'a>> {
let java_updated_fragment = self.updated_fragment.into_java(env)?;
let java_fields_modified = JLance(self.fields_modified.clone()).into_java(env)?;
let java_updated_row_offsets = JLance(self.updated_row_offsets.clone()).into_java(env)?;
let java_updated_row_offset_bytes =
env.byte_array_from_slice(&self.updated_row_offset_bytes)?;
Ok(env.new_object(
FRAGMENT_UPDATE_RESULT_CLASS,
FRAGMENT_UPDATE_RESULT_CONSTRUCTOR_SIG,
&[
JValueGen::Object(&java_updated_fragment),
JValueGen::Object(&java_fields_modified),
JValueGen::Object(&java_updated_row_offsets),
JValueGen::Object(&java_updated_row_offset_bytes),
],
)?)
}
Expand Down
101 changes: 97 additions & 4 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use jni::sys::{jboolean, jint, jlong};
use lance::dataset::CommitBuilder;
use lance::dataset::transaction::{
DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction, TransactionBuilder,
UpdateMap, UpdateMapEntry, UpdateMode,
UpdateMap, UpdateMapEntry, UpdateMode, UpdatedFragmentOffsets,
};
use lance::io::ObjectStoreParams;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
Expand Down Expand Up @@ -433,7 +433,7 @@ fn convert_to_java_operation_inner<'local>(
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: _,
updated_fragment_offsets: _,
updated_fragment_offsets,
} => {
let removed_ids: Vec<JLance<i64>> = removed_fragment_ids
.iter()
Expand All @@ -457,16 +457,56 @@ fn convert_to_java_operation_inner<'local>(
&[JValue::Object(&update_mode)],
)?
.l()?;
// Serialize updated_fragment_offsets to Java Map<Long, byte[]>.
// Values are portable RoaringBitmap bytes so the JNI boundary stays O(bitmap size)
// rather than O(n rows). Empty HashMap when None so the Java constructor always
// receives a non-null map.
let java_offsets_map = {
let java_map = env.new_object("java/util/HashMap", "()V", &[])?;
if let Some(UpdatedFragmentOffsets(ref map)) = updated_fragment_offsets {
for (frag_id, bitmap) in map {
let mut buf: Vec<u8> = Vec::new();
bitmap.serialize_into(&mut buf).map_err(|e| {
Error::runtime_error(format!(
"failed to serialize updatedFragmentOffsets for fragment \
{frag_id}: {e}"
))
})?;
// JNI byte arrays are signed i8; reinterpret without copying.
let buf_i8: &[i8] = unsafe {
std::slice::from_raw_parts(buf.as_ptr() as *const i8, buf.len())
};
env.with_local_frame(4, |env| {
let java_key = env.new_object(
"java/lang/Long",
"(J)V",
&[JValue::Long(*frag_id as i64)],
)?;
let java_arr = env.new_byte_array(buf_i8.len() as i32)?;
env.set_byte_array_region(&java_arr, 0, buf_i8)?;
env.call_method(
&java_map,
"put",
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
&[JValue::Object(&java_key), JValue::Object(&*java_arr)],
)?;
Ok::<JObject, Error>(JObject::null())
})?;
}
}
java_map
};
Ok(env.new_object(
"org/lance/operation/Update",
"(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;)V",
"(Ljava/util/List;Ljava/util/List;Ljava/util/List;[J[JLjava/util/Optional;Ljava/util/Map;)V",
&[
JValue::Object(&removed_fragment_ids_obj),
JValue::Object(&updated_fragments_obj),
JValue::Object(&new_fragments_obj),
JValueGen::Object(&fields_modified),
JValueGen::Object(&fields_for_preserving_frag_bitmap),
JValue::Object(&update_mode_optional),
JValue::Object(&java_offsets_map),
],
)?)
}
Expand Down Expand Up @@ -1238,6 +1278,59 @@ fn convert_to_rust_operation(
update_mode.extract_object(env)
})?;

let updated_fragment_offsets = {
let offsets_obj = env
.call_method(
java_operation,
"updatedFragmentOffsets",
"()Ljava/util/Map;",
&[],
)?
.l()?;
if offsets_obj.is_null() {
None
} else {
let jmap = JMap::from_env(env, &offsets_obj)?;
let mut iter = jmap.iter(env)?;
let mut offsets: HashMap<u64, RoaringBitmap> = HashMap::new();
// Per-iteration local frame: iterator key/value JNI refs are released each
// loop so large multi-fragment maps cannot exhaust the local reference table.
loop {
let entry = env.with_local_frame(
8,
|env| -> Result<Option<(u64, RoaringBitmap)>> {
let Some((key, value)) = iter.next(env)? else {
return Ok(None);
};
let frag_id =
env.call_method(&key, "longValue", "()J", &[])?.j()? as u64;
let buf: Vec<u8> =
env.convert_byte_array(JByteArray::from(value))?;
let bitmap = RoaringBitmap::deserialize_from(buf.as_slice())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit path accepts arbitrary offset bitmaps and later expands them into a full offset vector before checking fragment bounds. A compact valid Roaring bitmap can force huge allocations during a Java RewriteColumns commit.

.map_err(|e| {
Error::input_error(format!(
"invalid updatedFragmentOffsets RoaringBitmap bytes \
for fragment {frag_id}: {e}"
))
})?;
Ok(Some((frag_id, bitmap)))
},
)?;
match entry {
None => break,
Some((frag_id, bitmap)) => {
offsets.insert(frag_id, bitmap);
}
}
}
if offsets.is_empty() {
None
} else {
Some(UpdatedFragmentOffsets(offsets))
}
}
};

Operation::Update {
removed_fragment_ids,
updated_fragments,
Expand All @@ -1247,7 +1340,7 @@ fn convert_to_rust_operation(
fields_for_preserving_frag_bitmap,
update_mode,
inserted_rows_filter: None,
updated_fragment_offsets: None,
updated_fragment_offsets,
}
}
"DataReplacement" => {
Expand Down
51 changes: 43 additions & 8 deletions java/src/main/java/org/lance/fragment/FragmentUpdateResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,37 @@ public class FragmentUpdateResult {
private final FragmentMetadata updatedFragment;
private final long[] fieldsModified;

/** Local physical row offsets within the fragment that received updates (see RowAddress). */
private final long[] updatedRowOffsets;
/**
* Matched physical row offsets within the fragment, serialized as portable RoaringBitmap bytes
* (little-endian, same format as {@link org.lance.operation.Update#updatedFragmentOffsets()}).
*/
private final byte[] updatedRowOffsetBytes;

/** Two-argument form for callers that do not track per-row offsets; offsets default to empty. */
public FragmentUpdateResult(FragmentMetadata updatedFragment, long[] updatedFieldIds) {
this(updatedFragment, updatedFieldIds, new long[0]);
this(updatedFragment, updatedFieldIds, new byte[0]);
}

public FragmentUpdateResult(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a public byte[] constructor with the same arity as the retained long[] constructor makes existing source calls that pass null for offsets fail overload resolution.

FragmentMetadata updatedFragment, long[] updatedFieldIds, long[] updatedRowOffsets) {
FragmentMetadata updatedFragment, long[] updatedFieldIds, byte[] updatedRowOffsetBytes) {
this.updatedFragment = updatedFragment;
this.fieldsModified = updatedFieldIds;
this.updatedRowOffsets = updatedRowOffsets;
this.updatedRowOffsetBytes =
updatedRowOffsetBytes != null ? updatedRowOffsetBytes : new byte[0];
}

/**
* @deprecated Use {@link #getUpdatedRowOffsetBytes()} instead. This method expands serialized
* RoaringBitmap bytes to a {@code long[]} via JNI and is retained for backward compatibility
* with callers compiled against the #6650 API.
*/
@Deprecated
public FragmentUpdateResult(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retained long[] compatibility path now calls native helpers from a class that never loads the JNI library. Existing callers can hit UnsatisfiedLinkError unless another Lance class happened to initialize first.

FragmentMetadata updatedFragment, long[] updatedFieldIds, long[] updatedRowOffsets) {
this(
updatedFragment,
updatedFieldIds,
encodeRowOffsetsToBytes(updatedRowOffsets != null ? updatedRowOffsets : new long[0]));
}

public FragmentMetadata getUpdatedFragment() {
Expand All @@ -49,17 +67,34 @@ public long[] getFieldsModified() {
return fieldsModified;
}

/** Physical row offsets (0-based within the fragment) whose columns were rewritten. */
/**
* Physical row offsets (0-based within the fragment) whose columns were rewritten, as portable
* RoaringBitmap bytes.
*/
public byte[] getUpdatedRowOffsetBytes() {
return updatedRowOffsetBytes;
}

/**
* Physical row offsets (0-based within the fragment) whose columns were rewritten.
*
* @deprecated Use {@link #getUpdatedRowOffsetBytes()} instead.
*/
@Deprecated
public long[] getUpdatedRowOffsets() {
return updatedRowOffsets;
return expandRowOffsetsFromBytes(updatedRowOffsetBytes);
}

private static native byte[] encodeRowOffsetsToBytes(long[] rowOffsets);

private static native long[] expandRowOffsetsFromBytes(byte[] rowOffsetBytes);

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("fragmentMetadata", updatedFragment)
.add("updatedFieldIds", fieldsModified)
.add("updatedRowOffsets", updatedRowOffsets)
.add("updatedRowOffsetBytesLength", updatedRowOffsetBytes.length)
.toString();
}
}
Loading
Loading