diff --git a/rust/lance/src/dataset/tests/dataset_merge_update.rs b/rust/lance/src/dataset/tests/dataset_merge_update.rs index 7fa03d6e78d..c6f448040c2 100644 --- a/rust/lance/src/dataset/tests/dataset_merge_update.rs +++ b/rust/lance/src/dataset/tests/dataset_merge_update.rs @@ -1047,7 +1047,8 @@ async fn test_datafile_replacement_error() { Operation::DataReplacement { replacements: vec![DataReplacementGroup(0, new_data_file)], }, - Some(2), + // read at the current version (after the Merge above) + Some(dataset.manifest.version), None, None, Arc::new(Default::default()), diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index b242cd5b3dd..dc898534c89 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -904,13 +904,42 @@ impl<'a> TransactionRebase<'a> { match &other_transaction.operation { Operation::Append { .. } | Operation::Clone { .. } - | Operation::Delete { .. } - | Operation::Update { .. } - | Operation::Merge { .. } | Operation::UpdateConfig { .. } | Operation::ReserveFragments { .. } | Operation::Project { .. } | Operation::UpdateBases { .. } => Ok(()), + Operation::Merge { .. } => { + // Merge rewrites the whole fragment list; always conflict + // (symmetric with check_merge_txn). + Err(self.retryable_conflict_err(other_transaction, other_version)) + } + Operation::Update { + updated_fragments, + removed_fragment_ids, + .. + } + | Operation::Delete { + updated_fragments, + deleted_fragment_ids: removed_fragment_ids, + .. + } => { + // A concurrent Update/Delete that changed one of our target + // fragments makes our positional column file stale; conflict so + // the committer rebuilds (lance otherwise accepts it silently). + for replacement in replacements { + let touches_our_fragment = updated_fragments + .iter() + .map(|f| f.id) + .chain(removed_fragment_ids.iter().copied()) + .any(|id| id == replacement.0); + if touches_our_fragment { + return Err( + self.retryable_conflict_err(other_transaction, other_version) + ); + } + } + Ok(()) + } Operation::CreateIndex { new_indices, .. } => { // A data replacement only conflicts if it is updating the field that // is being indexed. @@ -3258,7 +3287,7 @@ mod tests { ( "DataReplacement vs Rewrite on different fragment", Operation::DataReplacement { - replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01)], + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], }, Operation::Rewrite { groups: vec![RewriteGroup { @@ -3270,6 +3299,80 @@ mod tests { }, Compatible, ), + // A concurrent Update/Delete on a fragment we replace a column in must + // conflict, else the stale positional file is applied silently. + ( + "DataReplacement vs Update on same fragment", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::Update { + updated_fragments: vec![Fragment::new(0)], + removed_fragment_ids: vec![], + new_fragments: vec![], + fields_modified: vec![], + merged_generations: Vec::new(), + fields_for_preserving_frag_bitmap: vec![], + update_mode: None, + inserted_rows_filter: None, + updated_fragment_offsets: None, + }, + Retryable, + ), + ( + "DataReplacement vs Update on different fragment", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::Update { + updated_fragments: vec![Fragment::new(1)], + removed_fragment_ids: vec![], + new_fragments: vec![], + fields_modified: vec![], + merged_generations: Vec::new(), + fields_for_preserving_frag_bitmap: vec![], + update_mode: None, + inserted_rows_filter: None, + updated_fragment_offsets: None, + }, + Compatible, + ), + ( + "DataReplacement vs Delete on same fragment", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::Delete { + deleted_fragment_ids: vec![], + updated_fragments: vec![Fragment::new(0)], + predicate: "a > 0".to_string(), + }, + Retryable, + ), + ( + "DataReplacement vs Delete that removes the fragment", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())], + }, + Operation::Delete { + deleted_fragment_ids: vec![0], + updated_fragments: vec![], + predicate: "a > 0".to_string(), + }, + Retryable, + ), + // Merge rewrites the whole fragment list -> always conflicts. + ( + "DataReplacement vs Merge", + Operation::DataReplacement { + replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01)], + }, + Operation::Merge { + fragments: vec![Fragment::new(0)], + schema: lance_core::datatypes::Schema::default(), + }, + Retryable, + ), ]; for (description, op1, op2, expected) in cases {