Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions datafusion/physical-expr/benches/binary_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use arrow::{array::StringArray, record_batch::RecordBatch};
use arrow::{
array::BooleanArray,
array::{BooleanArray, Date32Array, Date64Array},
datatypes::{DataType, Field, Schema},
};
use arrow::{array::StringArray, record_batch::RecordBatch};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_expr::{Operator, and, binary_expr, col, lit, or};
use datafusion_physical_expr::{
Expand All @@ -30,6 +30,9 @@ use datafusion_physical_expr::{
use std::hint::black_box;
use std::sync::Arc;

const DATE_ARRAY_LEN: usize = 8192;
const MILLIS_PER_DAY: i64 = 86_400_000;

/// Generates BooleanArrays with different true/false distributions for benchmarking.
///
/// Returns a vector of tuples containing scenario name and corresponding BooleanArray.
Expand Down Expand Up @@ -309,6 +312,81 @@ fn create_record_batch<const TEST_ALL_FALSE: bool>(
Ok(rbs)
}

criterion_group!(benches, benchmark_binary_op_in_short_circuit);
fn make_date32_batch(null_percent: f64) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Date32, true),
Field::new("b", DataType::Date32, true),
]));

let left = Date32Array::from_iter((0..DATE_ARRAY_LEN).map(|i| {
(null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0)
.then_some(18_000 + i as i32)
}));
let right = Date32Array::from_iter((0..DATE_ARRAY_LEN).map(|i| {
(null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0)
.then_some(17_000 + (i % 365) as i32)
}));

RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(left), Arc::new(right)])
.unwrap()
}

fn make_date64_batch(null_percent: f64) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Date64, true),
Field::new("b", DataType::Date64, true),
]));

let left = Date64Array::from_iter((0..DATE_ARRAY_LEN).map(|i| {
(null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0)
.then_some((18_000 + i as i64) * MILLIS_PER_DAY)
}));
let right = Date64Array::from_iter((0..DATE_ARRAY_LEN).map(|i| {
(null_percent == 0.0 || i % (1.0 / null_percent) as usize != 0)
.then_some((17_000 + (i % 365) as i64) * MILLIS_PER_DAY)
}));

RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(left), Arc::new(right)])
.unwrap()
}

/// Benchmark Date32 column subtraction.
fn benchmark_date32_subtract(c: &mut Criterion) {
for (name, null_percent) in [("no_nulls", 0.0), ("20_percent_nulls", 0.2)] {
let batch = make_date32_batch(null_percent);
let expr = BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Minus,
Arc::new(Column::new("b", 1)),
);

c.bench_function(&format!("date32_subtract/{name}"), |b| {
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
}
}

/// Benchmark Date64 column subtraction.
fn benchmark_date64_subtract(c: &mut Criterion) {
for (name, null_percent) in [("no_nulls", 0.0), ("20_percent_nulls", 0.2)] {
let batch = make_date64_batch(null_percent);
let expr = BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Minus,
Arc::new(Column::new("b", 1)),
);

c.bench_function(&format!("date64_subtract/{name}"), |b| {
b.iter(|| black_box(expr.evaluate(black_box(&batch)).unwrap()))
});
}
}

criterion_group!(
benches,
benchmark_binary_op_in_short_circuit,
benchmark_date32_subtract,
benchmark_date64_subtract
);

criterion_main!(benches);
216 changes: 155 additions & 61 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,82 +177,100 @@ fn is_date_minus_date(lhs: &DataType, rhs: &DataType) -> bool {
)
}

/// Computes the difference between two dates and returns the result as Int64 (days)
/// This aligns with PostgreSQL, DuckDB, and MySQL behavior where date - date returns an integer
/// Milliseconds per day, used for Date64 subtraction.
const MILLIS_PER_DAY: i64 = 86_400_000;

/// Evaluates `Date32 - Date32` or `Date64 - Date64`, returning the difference in
/// whole days as `Int64`.
///
/// Implementation: Uses Arrow's sub_wrapping to get Duration, then converts to Int64 days
/// This matches the behavior of PostgreSQL, DuckDB, and MySQL, where
/// `date - date` yields an integer day count rather than an interval.
fn apply_date_subtraction(
lhs: &ColumnarValue,
rhs: &ColumnarValue,
) -> Result<ColumnarValue> {
use arrow::compute::kernels::numeric::sub_wrapping;

// Use Arrow's sub_wrapping to compute the Duration result
let duration_result = apply(lhs, rhs, sub_wrapping)?;

// Convert Duration to Int64 (days)
match duration_result {
ColumnarValue::Array(array) => {
let int64_array = duration_to_days(&array)?;
Ok(ColumnarValue::Array(int64_array))
match (lhs.data_type(), rhs.data_type()) {
(DataType::Date32, DataType::Date32) => {
subtract_date_to_days::<Date32Type>(lhs, rhs, |l, r| l - r)
}
ColumnarValue::Scalar(scalar) => {
// Convert scalar Duration to Int64 days
let array = scalar.to_array_of_size(1)?;
let int64_array = duration_to_days(&array)?;
let int64_scalar = ScalarValue::try_from_array(int64_array.as_ref(), 0)?;
Ok(ColumnarValue::Scalar(int64_scalar))
(DataType::Date64, DataType::Date64) => {
subtract_date_to_days::<Date64Type>(lhs, rhs, |l, r| {
l.wrapping_sub(r) / MILLIS_PER_DAY
})
}
(_, _) => unreachable!("apply_date_subtraction called with non-date types"),
}
}

/// Converts a Duration array to Int64 days
/// Handles different Duration time units (Second, Millisecond, Microsecond, Nanosecond)
fn duration_to_days(array: &ArrayRef) -> Result<ArrayRef> {
use datafusion_common::cast::{
as_duration_microsecond_array, as_duration_millisecond_array,
as_duration_nanosecond_array, as_duration_second_array,
};
/// Generic date subtraction: operates directly on the native primitive values
/// of `T` (i32 for Date32, i64 for Date64), applying `day_diff_fn` to produce
/// an Int64 day count.
fn subtract_date_to_days<T: ArrowPrimitiveType>(
lhs: &ColumnarValue,
rhs: &ColumnarValue,
day_diff_fn: impl Fn(i64, i64) -> i64,
) -> Result<ColumnarValue>
where
T::Native: Copy + Into<i64>,
{
/// Extract the date value as `i64`. Returns `None` for null scalars.
fn date_scalar_to_i64<P: ArrowPrimitiveType>(
scalar: &ScalarValue,
) -> Result<Option<i64>> {
match scalar {
ScalarValue::Date32(value) if P::DATA_TYPE == DataType::Date32 => {
Ok(value.map(i64::from))
}
ScalarValue::Date64(value) if P::DATA_TYPE == DataType::Date64 => Ok(*value),
other => {
internal_err!(
"{} date scalar expected, got: {}",
P::DATA_TYPE,
other.data_type()
)
}
}
}

const SECONDS_PER_DAY: i64 = 86_400;
const MILLIS_PER_DAY: i64 = 86_400_000;
const MICROS_PER_DAY: i64 = 86_400_000_000;
const NANOS_PER_DAY: i64 = 86_400_000_000_000;

match array.data_type() {
DataType::Duration(TimeUnit::Second) => {
let duration_array = as_duration_second_array(array)?;
let result: Int64Array = duration_array
.iter()
.map(|v| v.map(|val| val / SECONDS_PER_DAY))
.collect();
Ok(Arc::new(result))
match (lhs, rhs) {
(ColumnarValue::Array(left), ColumnarValue::Array(right)) => {
let left = left.as_primitive::<T>();
let right = right.as_primitive::<T>();
let result: Int64Array =
arrow::compute::binary::<_, _, _, Int64Type>(left, right, |l, r| {
day_diff_fn(l.into(), r.into())
})?;
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Duration(TimeUnit::Millisecond) => {
let duration_array = as_duration_millisecond_array(array)?;
let result: Int64Array = duration_array
.iter()
.map(|v| v.map(|val| val / MILLIS_PER_DAY))
.collect();
Ok(Arc::new(result))
(ColumnarValue::Array(left), ColumnarValue::Scalar(right)) => {
let left = left.as_primitive::<T>();
match date_scalar_to_i64::<T>(right)? {
Some(right_val) => {
let result: Int64Array =
left.unary(|l| day_diff_fn(l.into(), right_val));
Ok(ColumnarValue::Array(Arc::new(result)))
}
None => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))),
}
}
DataType::Duration(TimeUnit::Microsecond) => {
let duration_array = as_duration_microsecond_array(array)?;
let result: Int64Array = duration_array
.iter()
.map(|v| v.map(|val| val / MICROS_PER_DAY))
.collect();
Ok(Arc::new(result))
(ColumnarValue::Scalar(left), ColumnarValue::Array(right)) => {
let right = right.as_primitive::<T>();
match date_scalar_to_i64::<T>(left)? {
Some(left_val) => {
let result: Int64Array =
right.unary(|r| day_diff_fn(left_val, r.into()));
Ok(ColumnarValue::Array(Arc::new(result)))
}
None => Ok(ColumnarValue::Scalar(ScalarValue::Int64(None))),
}
}
DataType::Duration(TimeUnit::Nanosecond) => {
let duration_array = as_duration_nanosecond_array(array)?;
let result: Int64Array = duration_array
.iter()
.map(|v| v.map(|val| val / NANOS_PER_DAY))
.collect();
Ok(Arc::new(result))
(ColumnarValue::Scalar(left), ColumnarValue::Scalar(right)) => {
let left_val = date_scalar_to_i64::<T>(left)?;
let right_val = date_scalar_to_i64::<T>(right)?;
Ok(ColumnarValue::Scalar(ScalarValue::Int64(
left_val.zip(right_val).map(|(l, r)| day_diff_fn(l, r)),
)))
}
other => internal_err!("duration_to_days expected Duration type, got: {}", other),
}
}

Expand Down Expand Up @@ -2012,6 +2030,82 @@ mod tests {
Ok(())
}

#[test]
fn date32_minus_date32_returns_int64_days() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Date32, true),
Field::new("b", DataType::Date32, true),
]));
let a = Arc::new(Date32Array::from(vec![
Some(18_901),
Some(18_901),
None,
Some(18_900),
]));
let b = Arc::new(Date32Array::from(vec![
Some(18_898),
Some(18_904),
Some(18_900),
None,
]));

apply_arithmetic::<Int64Type>(
schema,
vec![a, b],
Operator::Minus,
Int64Array::from(vec![Some(3), Some(-3), None, None]),
)?;

Ok(())
}

#[test]
fn date64_minus_date64_returns_int64_days() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Date64, true),
Field::new("b", DataType::Date64, true),
]));
let a = Arc::new(Date64Array::from(vec![
Some(18_901 * MILLIS_PER_DAY),
Some(18_901 * MILLIS_PER_DAY),
None,
Some(18_900 * MILLIS_PER_DAY),
]));
let b = Arc::new(Date64Array::from(vec![
Some(18_898 * MILLIS_PER_DAY),
Some(18_904 * MILLIS_PER_DAY),
Some(18_900 * MILLIS_PER_DAY),
None,
]));

apply_arithmetic::<Int64Type>(
schema,
vec![a, b],
Operator::Minus,
Int64Array::from(vec![Some(3), Some(-3), None, None]),
)?;

Ok(())
}

#[test]
fn date32_minus_null_scalar_returns_int64_null_scalar() -> Result<()> {
let result = apply_date_subtraction(
&ColumnarValue::Array(Arc::new(Date32Array::from(vec![
Some(18_901),
Some(18_900),
]))),
&ColumnarValue::Scalar(ScalarValue::Date32(None)),
)?;

assert!(matches!(
result,
ColumnarValue::Scalar(ScalarValue::Int64(None))
));

Ok(())
}

#[test]
fn minus_op_dict() -> Result<()> {
let schema = Schema::new(vec![
Expand Down
Loading