Skip to content

Commit 039a1b4

Browse files
committed
Add unit tests for physical optimizer rule
1 parent 10c8be6 commit 039a1b4

4 files changed

Lines changed: 284 additions & 0 deletions

File tree

datafusion/ffi/src/physical_optimizer.rs

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,153 @@ impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
219219
unsafe { (self.0.schema_check)(&self.0) }
220220
}
221221
}
222+
223+
#[cfg(test)]
224+
mod tests {
225+
use std::sync::Arc;
226+
227+
use arrow::datatypes::{DataType, Field, Schema};
228+
use datafusion_common::config::ConfigOptions;
229+
use datafusion_common::error::Result;
230+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
231+
use datafusion_physical_plan::ExecutionPlan;
232+
233+
use crate::execution_plan::tests::EmptyExec;
234+
235+
use super::*;
236+
237+
#[derive(Debug)]
238+
struct NoOpRule {
239+
schema_check: bool,
240+
}
241+
242+
impl PhysicalOptimizerRule for NoOpRule {
243+
fn optimize(
244+
&self,
245+
plan: Arc<dyn ExecutionPlan>,
246+
_config: &ConfigOptions,
247+
) -> Result<Arc<dyn ExecutionPlan>> {
248+
Ok(plan)
249+
}
250+
251+
fn name(&self) -> &str {
252+
"no_op_rule"
253+
}
254+
255+
fn schema_check(&self) -> bool {
256+
self.schema_check
257+
}
258+
}
259+
260+
fn create_test_plan() -> Arc<dyn ExecutionPlan> {
261+
let schema =
262+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
263+
Arc::new(EmptyExec::new(schema))
264+
}
265+
266+
#[test]
267+
fn test_round_trip_ffi_physical_optimizer_rule() -> Result<()> {
268+
for expected_schema_check in [true, false] {
269+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(NoOpRule {
270+
schema_check: expected_schema_check,
271+
});
272+
273+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
274+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
275+
276+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
277+
(&ffi_rule).into();
278+
279+
assert_eq!(foreign_rule.name(), "no_op_rule");
280+
assert_eq!(foreign_rule.schema_check(), expected_schema_check);
281+
}
282+
283+
Ok(())
284+
}
285+
286+
#[test]
287+
fn test_round_trip_optimize() -> Result<()> {
288+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
289+
Arc::new(NoOpRule { schema_check: true });
290+
291+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
292+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
293+
294+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
295+
(&ffi_rule).into();
296+
297+
let plan = create_test_plan();
298+
let config = ConfigOptions::new();
299+
300+
let optimized = foreign_rule.optimize(plan, &config)?;
301+
assert_eq!(optimized.name(), "empty-exec");
302+
303+
Ok(())
304+
}
305+
306+
#[test]
307+
fn test_local_bypass() -> Result<()> {
308+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
309+
Arc::new(NoOpRule { schema_check: true });
310+
311+
// Without mock marker, local bypass should return the original rule
312+
let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
313+
let recovered: Arc<dyn PhysicalOptimizerRule + Send + Sync> = (&ffi_rule).into();
314+
let any_ref: &dyn std::any::Any = &*recovered;
315+
assert!(any_ref.downcast_ref::<NoOpRule>().is_some());
316+
317+
// With mock marker, should wrap in ForeignPhysicalOptimizerRule
318+
let rule2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
319+
Arc::new(NoOpRule { schema_check: true });
320+
let mut ffi_rule2 = FFI_PhysicalOptimizerRule::new(rule2, None);
321+
ffi_rule2.library_marker_id = crate::mock_foreign_marker_id;
322+
let recovered2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
323+
(&ffi_rule2).into();
324+
let any_ref2: &dyn std::any::Any = &*recovered2;
325+
assert!(
326+
any_ref2
327+
.downcast_ref::<ForeignPhysicalOptimizerRule>()
328+
.is_some()
329+
);
330+
331+
Ok(())
332+
}
333+
334+
#[test]
335+
fn test_clone() -> Result<()> {
336+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
337+
Arc::new(NoOpRule { schema_check: true });
338+
339+
let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
340+
let cloned = ffi_rule.clone();
341+
342+
assert_eq!(unsafe { (ffi_rule.name)(&ffi_rule).as_str() }, unsafe {
343+
(cloned.name)(&cloned).as_str()
344+
});
345+
346+
Ok(())
347+
}
348+
349+
#[test]
350+
fn test_foreign_rule_rewrap_bypass() -> Result<()> {
351+
// When creating an FFI wrapper from a ForeignPhysicalOptimizerRule,
352+
// it should return the inner FFI rule rather than double-wrapping.
353+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
354+
Arc::new(NoOpRule { schema_check: true });
355+
356+
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
357+
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
358+
359+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
360+
(&ffi_rule).into();
361+
362+
// Now wrap the foreign rule back into FFI - should not double-wrap
363+
let re_wrapped = FFI_PhysicalOptimizerRule::new(foreign_rule, None);
364+
assert_eq!(
365+
unsafe { (re_wrapped.name)(&re_wrapped).as_str() },
366+
"no_op_rule"
367+
);
368+
369+
Ok(())
370+
}
371+
}

datafusion/ffi/src/tests/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::catalog_provider_list::FFI_CatalogProviderList;
3939
use crate::config::extension_options::FFI_ExtensionOptions;
4040
use crate::execution_plan::FFI_ExecutionPlan;
4141
use crate::execution_plan::tests::EmptyExec;
42+
use crate::physical_optimizer::FFI_PhysicalOptimizerRule;
4243
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
4344
use crate::table_provider::FFI_TableProvider;
4445
use crate::table_provider_factory::FFI_TableProviderFactory;
@@ -51,6 +52,7 @@ use crate::udwf::FFI_WindowUDF;
5152
mod async_provider;
5253
pub mod catalog;
5354
pub mod config;
55+
mod physical_optimizer;
5456
mod sync_provider;
5557
mod table_provider_factory;
5658
mod udf_udaf_udwf;
@@ -104,6 +106,8 @@ pub struct ForeignLibraryModule {
104106

105107
pub create_empty_exec: extern "C" fn() -> FFI_ExecutionPlan,
106108

109+
pub create_physical_optimizer_rule: extern "C" fn() -> FFI_PhysicalOptimizerRule,
110+
107111
pub version: extern "C" fn() -> u64,
108112
}
109113

@@ -177,6 +181,8 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
177181
create_rank_udwf: create_ffi_rank_func,
178182
create_extension_options: config::create_extension_options,
179183
create_empty_exec,
184+
create_physical_optimizer_rule:
185+
physical_optimizer::create_physical_optimizer_rule,
180186
version: super::version,
181187
}
182188
.leak_into_prefix()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use datafusion_common::config::ConfigOptions;
21+
use datafusion_common::error::Result;
22+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
23+
use datafusion_physical_plan::ExecutionPlan;
24+
use datafusion_physical_plan::limit::GlobalLimitExec;
25+
26+
use crate::physical_optimizer::FFI_PhysicalOptimizerRule;
27+
28+
/// A rule that wraps the input plan in a GlobalLimitExec with skip=0, fetch=10.
29+
/// This produces an observable change in the plan tree that tests can verify.
30+
#[derive(Debug)]
31+
struct AddLimitRule;
32+
33+
impl PhysicalOptimizerRule for AddLimitRule {
34+
fn optimize(
35+
&self,
36+
plan: Arc<dyn ExecutionPlan>,
37+
_config: &ConfigOptions,
38+
) -> Result<Arc<dyn ExecutionPlan>> {
39+
Ok(Arc::new(GlobalLimitExec::new(plan, 0, Some(10))))
40+
}
41+
42+
fn name(&self) -> &str {
43+
"add_limit_rule"
44+
}
45+
46+
fn schema_check(&self) -> bool {
47+
true
48+
}
49+
}
50+
51+
pub(crate) extern "C" fn create_physical_optimizer_rule() -> FFI_PhysicalOptimizerRule {
52+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(AddLimitRule);
53+
FFI_PhysicalOptimizerRule::new(rule, None)
54+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#[cfg(feature = "integration-tests")]
19+
mod tests {
20+
use std::sync::Arc;
21+
22+
use arrow::datatypes::{DataType, Field, Schema};
23+
use datafusion_common::DataFusionError;
24+
use datafusion_common::config::ConfigOptions;
25+
use datafusion_ffi::execution_plan::tests::EmptyExec;
26+
use datafusion_ffi::physical_optimizer::ForeignPhysicalOptimizerRule;
27+
use datafusion_ffi::tests::utils::get_module;
28+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
29+
use datafusion_physical_plan::ExecutionPlan;
30+
31+
fn create_test_plan() -> Arc<dyn ExecutionPlan> {
32+
let schema =
33+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
34+
Arc::new(EmptyExec::new(schema))
35+
}
36+
37+
#[test]
38+
fn test_ffi_physical_optimizer_rule() -> Result<(), DataFusionError> {
39+
let module = get_module()?;
40+
41+
let ffi_rule = module.create_physical_optimizer_rule().ok_or(
42+
DataFusionError::NotImplemented(
43+
"External module failed to implement create_physical_optimizer_rule"
44+
.to_string(),
45+
),
46+
)?();
47+
48+
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
49+
(&ffi_rule).into();
50+
51+
// Verify the rule is wrapped as a foreign rule
52+
let any_ref: &dyn std::any::Any = &*foreign_rule;
53+
assert!(
54+
any_ref
55+
.downcast_ref::<ForeignPhysicalOptimizerRule>()
56+
.is_some()
57+
);
58+
59+
// Verify name and schema_check pass through FFI
60+
assert_eq!(foreign_rule.name(), "add_limit_rule");
61+
assert!(foreign_rule.schema_check());
62+
63+
// Verify the rule actually transforms the plan
64+
let plan = create_test_plan();
65+
let config = ConfigOptions::new();
66+
let optimized = foreign_rule.optimize(plan, &config)?;
67+
68+
assert_eq!(optimized.name(), "GlobalLimitExec");
69+
assert_eq!(optimized.children().len(), 1);
70+
assert_eq!(optimized.children()[0].name(), "empty-exec");
71+
72+
Ok(())
73+
}
74+
}

0 commit comments

Comments
 (0)