From f665ada55a24de804cc7441dc4cf7a8a2ca5d200 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 1 Jun 2026 11:23:53 +0800 Subject: [PATCH 1/4] feat: make filter_to_validity public and improve first/last aggregation functionality - Updated `filter_to_validity` to public in `groups_accumulator/nulls.rs`. - Modified `first_last.rs` to use shared `filter_to_validity` for grouped first/last. - Enhanced handling to reject NULL filter rows even if the value bit is true. - Added tests for: - First update path - Last update path - First merge path --- .../src/aggregate/groups_accumulator/nulls.rs | 2 +- .../functions-aggregate/src/first_last.rs | 127 +++++++++++++++++- 2 files changed, 127 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs index d524afe43a5a3..646d95d6bb359 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs @@ -43,7 +43,7 @@ pub fn set_nulls( /// /// The output is `true` for rows where the filter is `Some(true)`, and `false` /// for rows where the filter is `Some(false)` or `None`. -pub(crate) fn filter_to_validity(filter: &BooleanArray) -> BooleanBuffer { +pub fn filter_to_validity(filter: &BooleanArray) -> BooleanBuffer { let Some(filter_nulls) = filter.nulls() else { return filter.values().clone(); }; diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 1935f29c4cfe8..00168db41e2c3 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -45,6 +45,7 @@ use datafusion_expr::{ Accumulator, AggregateUDFImpl, Documentation, EmitTo, Expr, ExprFunctionExt, GroupsAccumulator, ReversedUDAF, Signature, SortExpr, Volatility, }; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filter_to_validity; use datafusion_functions_aggregate_common::utils::get_sort_options; use datafusion_macros::user_doc; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -552,10 +553,14 @@ impl FirstLastGroupsAccumulator { LexicographicalComparator::try_new(&sort_columns)? }; + let filter_validity = opt_filter.map(filter_to_validity); + for (idx_in_val, group_idx) in group_indices.iter().enumerate() { let group_idx = *group_idx; - let passed_filter = opt_filter.is_none_or(|x| x.value(idx_in_val)); + let passed_filter = filter_validity + .as_ref() + .is_none_or(|validity| validity.value(idx_in_val)); let is_set = is_set_arr.is_none_or(|x| x.value(idx_in_val)); if !passed_filter || !is_set { @@ -1416,6 +1421,7 @@ mod tests { use arrow::{ array::{BooleanArray, Int64Array, ListArray, PrimitiveArray, StringArray}, + buffer::NullBuffer, compute::SortOptions, datatypes::Schema, }; @@ -1621,6 +1627,125 @@ mod tests { Ok(()) } + #[test] + fn test_first_group_acc_rejects_null_filter_with_true_value_bit() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ])); + + let sort_keys = [PhysicalSortExpr { + expr: col("c", &schema).unwrap(), + options: SortOptions::default(), + }]; + + let mut group_acc = FirstLastGroupsAccumulator::try_new( + PrimitiveValueState::::new(DataType::Int64), + sort_keys.into(), + true, + &[DataType::Int64], + true, + )?; + + let values_and_orderings: Vec = vec![ + Arc::new(Int64Array::from(vec![10, 20])), + Arc::new(Int64Array::from(vec![1, 2])), + ]; + let filter = BooleanArray::new( + BooleanBuffer::from(vec![true, false]), + Some(NullBuffer::from(vec![false, true])), + ); + + group_acc.update_batch(&values_and_orderings, &[0, 0], Some(&filter), 1)?; + + let result = group_acc.evaluate(EmitTo::All)?; + let result = result.as_any().downcast_ref::().unwrap(); + let expected = Int64Array::from(vec![None]); + assert_eq!(result, &expected); + + Ok(()) + } + + #[test] + fn test_last_group_acc_rejects_null_filter_with_true_value_bit() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ])); + + let sort_keys = [PhysicalSortExpr { + expr: col("c", &schema).unwrap(), + options: SortOptions::default(), + }]; + + let mut group_acc = FirstLastGroupsAccumulator::try_new( + PrimitiveValueState::::new(DataType::Int64), + sort_keys.into(), + true, + &[DataType::Int64], + false, + )?; + + let values_and_orderings: Vec = vec![ + Arc::new(Int64Array::from(vec![10, 20, 30])), + Arc::new(Int64Array::from(vec![1, 2, 3])), + ]; + let filter = BooleanArray::new( + BooleanBuffer::from(vec![true, true, false]), + Some(NullBuffer::from(vec![false, true, true])), + ); + + group_acc.update_batch(&values_and_orderings, &[0, 0, 0], Some(&filter), 1)?; + + let result = group_acc.evaluate(EmitTo::All)?; + let result = result.as_any().downcast_ref::().unwrap(); + let expected = Int64Array::from(vec![Some(20)]); + assert_eq!(result, &expected); + + Ok(()) + } + + #[test] + fn test_first_group_acc_merge_rejects_null_filter_with_true_value_bit() -> Result<()> + { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ])); + + let sort_keys = [PhysicalSortExpr { + expr: col("c", &schema).unwrap(), + options: SortOptions::default(), + }]; + + let mut group_acc = FirstLastGroupsAccumulator::try_new( + PrimitiveValueState::::new(DataType::Int64), + sort_keys.into(), + true, + &[DataType::Int64], + true, + )?; + + let states: Vec = vec![ + Arc::new(Int64Array::from(vec![10, 20])), + Arc::new(Int64Array::from(vec![1, 2])), + Arc::new(BooleanArray::from(vec![true, true])), + ]; + let filter = BooleanArray::new( + BooleanBuffer::from(vec![true, true]), + Some(NullBuffer::from(vec![false, true])), + ); + + group_acc.merge_batch(&states, &[0, 0], Some(&filter), 1)?; + + let result = group_acc.evaluate(EmitTo::All)?; + let result = result.as_any().downcast_ref::().unwrap(); + let expected = Int64Array::from(vec![Some(20)]); + assert_eq!(result, &expected); + + Ok(()) + } + #[test] fn test_group_acc_size_of_ordering() -> Result<()> { let schema = Arc::new(Schema::new(vec![ From 537f10f29dc099522d613644c40db4acab4aa607 Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 1 Jun 2026 11:29:13 +0800 Subject: [PATCH 2/4] feat(tests): enhance first_last.rs with new test helpers and cleanup - Destructured group_indices loop by removing redundant variable assignment. - Added test helper: new_int64_first_last_group_acc(...) for improved test setup. - Added test helper: nullable_bool_filter(...) to facilitate testing with nullable booleans. - Replaced repeated test setup across tests with the newly created helpers for better maintainability. --- .../functions-aggregate/src/first_last.rs | 100 ++++++------------ 1 file changed, 34 insertions(+), 66 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 00168db41e2c3..06de4d86dc4b0 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -555,9 +555,7 @@ impl FirstLastGroupsAccumulator { let filter_validity = opt_filter.map(filter_to_validity); - for (idx_in_val, group_idx) in group_indices.iter().enumerate() { - let group_idx = *group_idx; - + for (idx_in_val, &group_idx) in group_indices.iter().enumerate() { let passed_filter = filter_validity .as_ref() .is_none_or(|validity| validity.value(idx_in_val)); @@ -1429,6 +1427,32 @@ mod tests { use super::*; + fn new_int64_first_last_group_acc( + pick_first_in_group: bool, + ) -> Result>> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("c", DataType::Int64, true), + ])); + + let sort_keys = [PhysicalSortExpr { + expr: col("c", &schema).unwrap(), + options: SortOptions::default(), + }]; + + FirstLastGroupsAccumulator::try_new( + PrimitiveValueState::::new(DataType::Int64), + sort_keys.into(), + true, + &[DataType::Int64], + pick_first_in_group, + ) + } + + fn nullable_bool_filter(values: Vec, valid: Vec) -> BooleanArray { + BooleanArray::new(BooleanBuffer::from(values), Some(NullBuffer::from(valid))) + } + #[test] fn test_first_last_value_value() -> Result<()> { let mut first_accumulator = @@ -1629,32 +1653,13 @@ mod tests { #[test] fn test_first_group_acc_rejects_null_filter_with_true_value_bit() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ])); - - let sort_keys = [PhysicalSortExpr { - expr: col("c", &schema).unwrap(), - options: SortOptions::default(), - }]; - - let mut group_acc = FirstLastGroupsAccumulator::try_new( - PrimitiveValueState::::new(DataType::Int64), - sort_keys.into(), - true, - &[DataType::Int64], - true, - )?; + let mut group_acc = new_int64_first_last_group_acc(true)?; let values_and_orderings: Vec = vec![ Arc::new(Int64Array::from(vec![10, 20])), Arc::new(Int64Array::from(vec![1, 2])), ]; - let filter = BooleanArray::new( - BooleanBuffer::from(vec![true, false]), - Some(NullBuffer::from(vec![false, true])), - ); + let filter = nullable_bool_filter(vec![true, false], vec![false, true]); group_acc.update_batch(&values_and_orderings, &[0, 0], Some(&filter), 1)?; @@ -1668,32 +1673,14 @@ mod tests { #[test] fn test_last_group_acc_rejects_null_filter_with_true_value_bit() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ])); - - let sort_keys = [PhysicalSortExpr { - expr: col("c", &schema).unwrap(), - options: SortOptions::default(), - }]; - - let mut group_acc = FirstLastGroupsAccumulator::try_new( - PrimitiveValueState::::new(DataType::Int64), - sort_keys.into(), - true, - &[DataType::Int64], - false, - )?; + let mut group_acc = new_int64_first_last_group_acc(false)?; let values_and_orderings: Vec = vec![ Arc::new(Int64Array::from(vec![10, 20, 30])), Arc::new(Int64Array::from(vec![1, 2, 3])), ]; - let filter = BooleanArray::new( - BooleanBuffer::from(vec![true, true, false]), - Some(NullBuffer::from(vec![false, true, true])), - ); + let filter = + nullable_bool_filter(vec![true, true, false], vec![false, true, true]); group_acc.update_batch(&values_and_orderings, &[0, 0, 0], Some(&filter), 1)?; @@ -1708,33 +1695,14 @@ mod tests { #[test] fn test_first_group_acc_merge_rejects_null_filter_with_true_value_bit() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ])); - - let sort_keys = [PhysicalSortExpr { - expr: col("c", &schema).unwrap(), - options: SortOptions::default(), - }]; - - let mut group_acc = FirstLastGroupsAccumulator::try_new( - PrimitiveValueState::::new(DataType::Int64), - sort_keys.into(), - true, - &[DataType::Int64], - true, - )?; + let mut group_acc = new_int64_first_last_group_acc(true)?; let states: Vec = vec![ Arc::new(Int64Array::from(vec![10, 20])), Arc::new(Int64Array::from(vec![1, 2])), Arc::new(BooleanArray::from(vec![true, true])), ]; - let filter = BooleanArray::new( - BooleanBuffer::from(vec![true, true]), - Some(NullBuffer::from(vec![false, true])), - ); + let filter = nullable_bool_filter(vec![true, true], vec![false, true]); group_acc.merge_batch(&states, &[0, 0], Some(&filter), 1)?; From dd015c0fdd1d499f9aa3756e34479b10f939614d Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Mon, 1 Jun 2026 11:41:41 +0800 Subject: [PATCH 3/4] feat: update nullable_bool_filter and improve test structure - Changed valid to validity in nullable_bool_filter. - Added assert_group_acc_int64_result function. - Replaced repeated evaluate/downcast/assert blocks in three tests for better maintainability. --- .../functions-aggregate/src/first_last.rs | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 06de4d86dc4b0..ebb2163cefe45 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -1449,8 +1449,21 @@ mod tests { ) } - fn nullable_bool_filter(values: Vec, valid: Vec) -> BooleanArray { - BooleanArray::new(BooleanBuffer::from(values), Some(NullBuffer::from(valid))) + fn nullable_bool_filter(values: Vec, validity: Vec) -> BooleanArray { + BooleanArray::new( + BooleanBuffer::from(values), + Some(NullBuffer::from(validity)), + ) + } + + fn assert_group_acc_int64_result( + group_acc: &mut FirstLastGroupsAccumulator>, + expected: Int64Array, + ) -> Result<()> { + let result = group_acc.evaluate(EmitTo::All)?; + let result = result.as_any().downcast_ref::().unwrap(); + assert_eq!(result, &expected); + Ok(()) } #[test] @@ -1663,12 +1676,7 @@ mod tests { group_acc.update_batch(&values_and_orderings, &[0, 0], Some(&filter), 1)?; - let result = group_acc.evaluate(EmitTo::All)?; - let result = result.as_any().downcast_ref::().unwrap(); - let expected = Int64Array::from(vec![None]); - assert_eq!(result, &expected); - - Ok(()) + assert_group_acc_int64_result(&mut group_acc, Int64Array::from(vec![None])) } #[test] @@ -1684,12 +1692,7 @@ mod tests { group_acc.update_batch(&values_and_orderings, &[0, 0, 0], Some(&filter), 1)?; - let result = group_acc.evaluate(EmitTo::All)?; - let result = result.as_any().downcast_ref::().unwrap(); - let expected = Int64Array::from(vec![Some(20)]); - assert_eq!(result, &expected); - - Ok(()) + assert_group_acc_int64_result(&mut group_acc, Int64Array::from(vec![Some(20)])) } #[test] @@ -1706,12 +1709,7 @@ mod tests { group_acc.merge_batch(&states, &[0, 0], Some(&filter), 1)?; - let result = group_acc.evaluate(EmitTo::All)?; - let result = result.as_any().downcast_ref::().unwrap(); - let expected = Int64Array::from(vec![Some(20)]); - assert_eq!(result, &expected); - - Ok(()) + assert_group_acc_int64_result(&mut group_acc, Int64Array::from(vec![Some(20)])) } #[test] From 73d8d1711baf95beb3c83d7882ffb2925900b01f Mon Sep 17 00:00:00 2001 From: Siew Kam Onn Date: Tue, 2 Jun 2026 18:26:10 +0800 Subject: [PATCH 4/4] feat: add SLT to aggregate.slt in sqllogictest directory --- .../sqllogictest/test_files/aggregate.slt | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 25b69d16dd035..9672c83b26da3 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -6197,6 +6197,50 @@ GROUP BY g ---- 0 0 +# Grouped first_value/last_value must apply aggregate FILTER with Some(true) +# semantics: a row passes only when the predicate is TRUE. Rows where the +# predicate evaluates to NULL or FALSE must be excluded. +# +# Rows per group (predicate is b < 1): +# g=1: (a=10, b=NULL -> NULL), (a=20, b=2 -> FALSE) => no rows pass +# g=2: (a=30, b=0 -> TRUE), (a=40, b=NULL -> NULL), +# (a=50, b=-5 -> TRUE) => a=30 and a=50 pass +# g=3: (a=60, b=NULL -> NULL) => no rows pass +statement ok +CREATE TABLE first_last_filter_null_tests(g INT, a INT, b INT) AS VALUES +(1, 10, CAST(NULL AS INT)), +(1, 20, 2), +(2, 30, 0), +(2, 40, CAST(NULL AS INT)), +(2, 50, -5), +(3, 60, CAST(NULL AS INT)); + +# Groups 1 and 3 have no rows passing the filter -> NULL. +# Group 2 has a=30 and a=50 passing -> first_value ORDER BY a = 30. +query II +SELECT g, first_value(a ORDER BY a) FILTER (WHERE b < 1) AS fv +FROM first_last_filter_null_tests +GROUP BY g +ORDER BY g; +---- +1 NULL +2 30 +3 NULL + +# Same groups via last_value: group 2 picks the largest passing a = 50. +query II +SELECT g, last_value(a ORDER BY a) FILTER (WHERE b < 1) AS lv +FROM first_last_filter_null_tests +GROUP BY g +ORDER BY g; +---- +1 NULL +2 50 +3 NULL + +statement ok +DROP TABLE first_last_filter_null_tests; + # query_with_and_without_filter query III rowsort SELECT