Skip to content

Commit 887e0b1

Browse files
committed
Add config options to FFI scalar UDFs
1 parent cc7bf6f commit 887e0b1

1 file changed

Lines changed: 18 additions & 21 deletions

File tree

datafusion/ffi/src/udf/mod.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use std::hash::{Hash, Hasher};
2020
use std::sync::Arc;
2121

2222
use abi_stable::StableAbi;
23-
use abi_stable::std_types::{RResult, RString, RVec};
24-
use arrow::array::ArrayRef;
23+
use abi_stable::std_types::{RString, RVec};
24+
use arrow::array::Array;
2525
use arrow::datatypes::{DataType, Field};
2626
use arrow::error::ArrowError;
2727
use arrow::ffi::{FFI_ArrowSchema, from_ffi, to_ffi};
@@ -38,6 +38,8 @@ use return_type_args::{
3838
};
3939

4040
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
41+
use crate::config::FFI_ConfigOptions;
42+
use crate::expr::columnar_value::FFI_ColumnarValue;
4143
use crate::util::{
4244
FFIResult, rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped,
4345
};
@@ -73,7 +75,8 @@ pub struct FFI_ScalarUDF {
7375
arg_fields: RVec<WrappedSchema>,
7476
num_rows: usize,
7577
return_field: WrappedSchema,
76-
) -> FFIResult<WrappedArray>,
78+
config_options: FFI_ConfigOptions,
79+
) -> FFIResult<FFI_ColumnarValue>,
7780

7881
/// See [`ScalarUDFImpl`] for details on short_circuits
7982
pub short_circuits: bool,
@@ -159,7 +162,8 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper(
159162
arg_fields: RVec<WrappedSchema>,
160163
number_rows: usize,
161164
return_field: WrappedSchema,
162-
) -> FFIResult<WrappedArray> {
165+
config_options: FFI_ConfigOptions,
166+
) -> FFIResult<FFI_ColumnarValue> {
163167
unsafe {
164168
let args = args
165169
.into_iter()
@@ -181,28 +185,22 @@ unsafe extern "C" fn invoke_with_args_fn_wrapper(
181185
})
182186
.collect::<Result<Vec<FieldRef>>>();
183187
let arg_fields = rresult_return!(arg_fields);
188+
let config_options = rresult_return!(ConfigOptions::try_from(config_options));
189+
let config_options = Arc::new(config_options);
184190

185191
let args = ScalarFunctionArgs {
186192
args,
187193
arg_fields,
188194
number_rows,
189195
return_field,
190-
// TODO: pass config options: https://github.com/apache/datafusion/issues/17035
191-
config_options: Arc::new(ConfigOptions::default()),
196+
config_options,
192197
};
193198

194-
let result = rresult_return!(
199+
rresult!(
195200
udf.inner()
196201
.invoke_with_args(args)
197-
.and_then(|r| r.to_array(number_rows))
198-
);
199-
200-
let (result_array, result_schema) = rresult_return!(to_ffi(&result.to_data()));
201-
202-
RResult::ROk(WrappedArray {
203-
array: result_array,
204-
schema: WrappedSchema(result_schema),
205-
})
202+
.and_then(FFI_ColumnarValue::try_from)
203+
)
206204
}
207205
}
208206

@@ -366,8 +364,7 @@ impl ScalarUDFImpl for ForeignScalarUDF {
366364
arg_fields,
367365
number_rows,
368366
return_field,
369-
// TODO: pass config options: https://github.com/apache/datafusion/issues/17035
370-
config_options: _config_options,
367+
config_options,
371368
} = invoke_args;
372369

373370
let args = args
@@ -396,6 +393,7 @@ impl ScalarUDFImpl for ForeignScalarUDF {
396393

397394
let return_field = return_field.as_ref().clone();
398395
let return_field = WrappedSchema(FFI_ArrowSchema::try_from(return_field)?);
396+
let config_options = config_options.as_ref().into();
399397

400398
let result = unsafe {
401399
(self.udf.invoke_with_args)(
@@ -404,13 +402,12 @@ impl ScalarUDFImpl for ForeignScalarUDF {
404402
arg_fields,
405403
number_rows,
406404
return_field,
405+
config_options,
407406
)
408407
};
409408

410409
let result = df_result!(result)?;
411-
let result_array: ArrayRef = result.try_into()?;
412-
413-
Ok(ColumnarValue::Array(result_array))
410+
result.try_into()
414411
}
415412

416413
fn aliases(&self) -> &[String] {

0 commit comments

Comments
 (0)