Skip to content

Commit 0abac7d

Browse files
adragomirccciudatu
authored andcommitted
[HSTACK] Datafusion compat - added delta table, ATS support, disable LTO
1 parent d72f560 commit 0abac7d

9 files changed

Lines changed: 1596 additions & 425 deletions

File tree

Cargo.lock

Lines changed: 1473 additions & 385 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
[package]
1919
name = "datafusion-python"
20-
version = "45.2.0"
20+
version = "46.0.0"
2121
homepage = "https://datafusion.apache.org/python"
2222
repository = "https://github.com/apache/datafusion-python"
2323
authors = ["Apache DataFusion <dev@datafusion.apache.org>"]
@@ -38,17 +38,19 @@ tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread", "sync
3838
pyo3 = { version = "0.23", features = ["extension-module", "abi3", "abi3-py38"] }
3939
pyo3-async-runtimes = { version = "0.23", features = ["tokio-runtime"]}
4040
arrow = { version = "54", features = ["pyarrow"] }
41-
datafusion = { version = "45.0.0", features = ["avro", "unicode_expressions"] }
42-
datafusion-substrait = { version = "45.0.0", optional = true }
43-
datafusion-proto = { version = "45.0.0" }
44-
datafusion-ffi = { version = "45.0.0" }
41+
datafusion = { version = "46.0.0", features = ["avro", "unicode_expressions"] }
42+
datafusion-substrait = { version = "46.0.0", optional = true }
43+
datafusion-proto = { version = "46.0.0" }
44+
datafusion-ffi = { version = "46.0.0" }
4545
prost = "0.13" # keep in line with `datafusion-substrait`
4646
uuid = { version = "1.12", features = ["v4"] }
4747
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
4848
async-trait = "0.1"
4949
futures = "0.3"
5050
object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
5151
url = "2"
52+
deltalake = { version = "0.25.0", features = ["datafusion", "azure", "s3"] }
53+
5254

5355
[build-dependencies]
5456
prost-types = "0.13" # keep in line with `datafusion-substrait`
@@ -58,6 +60,66 @@ pyo3-build-config = "0.23"
5860
name = "datafusion_python"
5961
crate-type = ["cdylib", "rlib"]
6062

61-
[profile.release]
62-
lto = true
63-
codegen-units = 1
63+
[patch.crates-io]
64+
datafusion = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
65+
datafusion-catalog = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
66+
datafusion-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
67+
datafusion-common-runtime = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
68+
datafusion-execution = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
69+
datafusion-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
70+
datafusion-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
71+
datafusion-functions = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
72+
datafusion-functions-aggregate = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
73+
datafusion-functions-aggregate-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
74+
datafusion-functions-nested = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
75+
datafusion-functions-table = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
76+
datafusion-functions-window = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
77+
datafusion-functions-window-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
78+
datafusion-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
79+
datafusion-physical-expr = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
80+
datafusion-physical-expr-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
81+
datafusion-physical-optimizer = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
82+
datafusion-physical-plan = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
83+
datafusion-proto = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
84+
datafusion-proto-common = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
85+
datafusion-sql = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
86+
datafusion-substrait = { git = 'https://github.com/hstack/arrow-datafusion.git', branch = 'main' }
87+
deltalake = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
88+
deltalake-catalog-glue = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
89+
deltalake-core = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
90+
deltalake-aws = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
91+
deltalake-azure = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
92+
deltalake-mount = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
93+
deltalake-sql = { git = 'https://github.com/hstack/delta-rs.git', branch = 'main' }
94+
reqwest = { git = 'https://github.com/hstack/reqwest.git', branch = 'disable-proxy-tunnel' }
95+
#datafusion-table-providers = { git = "https://github.com/hstack/datafusion-table-providers", branch = "main" }
96+
#datafusion = { path = "../arrow-datafusion/datafusion/core" }
97+
#datafusion-catalog = { path = "../arrow-datafusion/datafusion/catalog" }
98+
#datafusion-common = { path = "../arrow-datafusion/datafusion/common" }
99+
#datafusion-common-runtime = { path = "../arrow-datafusion/datafusion/common-runtime" }
100+
#datafusion-execution = { path = "../arrow-datafusion/datafusion/execution" }
101+
#datafusion-expr = { path = "../arrow-datafusion/datafusion/expr" }
102+
#datafusion-expr-common = { path = "../arrow-datafusion/datafusion/expr-common" }
103+
#datafusion-functions = { path = "../arrow-datafusion/datafusion/functions" }
104+
#datafusion-functions-aggregate = { path = "../arrow-datafusion/datafusion/functions-aggregate" }
105+
#datafusion-functions-aggregate-common = { path = "../arrow-datafusion/datafusion/functions-aggregate-common" }
106+
#datafusion-functions-nested = { path = "../arrow-datafusion/datafusion/functions-nested" }
107+
#datafusion-functions-table = { path = "../arrow-datafusion/datafusion/functions-table" }
108+
#datafusion-functions-window = { path = "../arrow-datafusion/datafusion/functions-window" }
109+
#datafusion-functions-window-common = { path = "../arrow-datafusion/datafusion/functions-window-common" }
110+
#datafusion-optimizer = { path = "../arrow-datafusion/datafusion/optimizer" }
111+
#datafusion-physical-expr = { path = "../arrow-datafusion/datafusion/physical-expr" }
112+
#datafusion-physical-expr-common = { path = "../arrow-datafusion/datafusion/physical-expr-common" }
113+
#datafusion-physical-optimizer = { path = "../arrow-datafusion/datafusion/physical-optimizer" }
114+
#datafusion-physical-plan = { path = "../arrow-datafusion/datafusion/physical-plan" }
115+
#datafusion-proto = { path = "../arrow-datafusion/datafusion/proto" }
116+
#datafusion-proto-common = { path = "../arrow-datafusion/datafusion/proto-common" }
117+
#datafusion-sql = { path = "../arrow-datafusion/datafusion/sql" }
118+
#datafusion-substrait = { path = "../arrow-datafusion/datafusion/substrait" }
119+
#deltalake = { path = "../delta-rs/crates/deltalake" }
120+
#deltalake-catalog-glue = { path = "../delta-rs/crates/catalog-glue" }
121+
#deltalake-core = { path = "../delta-rs/crates/core" }
122+
#deltalake-aws = { path = "../delta-rs/crates/aws" }
123+
#deltalake-azure = { path = "../delta-rs/crates/azure" }
124+
#deltalake-mount = { path = "../delta-rs/crates/mount" }
125+
#deltalake-sql = { path = "../delta-rs/crates/sql" }

python/datafusion/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,9 @@ def register_listing_table(
560560
file_sort_order_raw,
561561
)
562562

563+
def register_delta_table(self, name: str, table_uri: str, storage_opts: dict[str, str] = {}):
564+
self.ctx.register_delta_table(name, table_uri, storage_opts)
565+
563566
def sql(self, query: str, options: SQLOptions | None = None) -> DataFrame:
564567
"""Create a :py:class:`~datafusion.DataFrame` from SQL query text.
565568

src/context.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,10 @@ impl PySessionContext {
289289
} else {
290290
RuntimeEnvBuilder::default()
291291
};
292+
deltalake::azure::register_handlers(None);
293+
deltalake::aws::register_handlers(None);
294+
let config = config.set_bool("datafusion.sql_parser.enable_ident_normalization", false);
295+
292296
let runtime = Arc::new(runtime_env_builder.build()?);
293297
let session_state = SessionStateBuilder::new()
294298
.with_config(config)
@@ -390,6 +394,20 @@ impl PySessionContext {
390394
Ok(())
391395
}
392396

397+
pub fn register_delta_table(
398+
&self,
399+
name: &str,
400+
table_uri: &str,
401+
storage_opts: HashMap<String, String>,
402+
py: Python,
403+
) -> PyResult<()> {
404+
deltalake::ensure_initialized();
405+
let table = deltalake::open_table_with_storage_options(table_uri, storage_opts);
406+
let table = wait_for_future(py, table).map_err(py_datafusion_err)?;
407+
self.ctx.register_table(name, Arc::new(table)).map_err(py_datafusion_err)?;
408+
Ok(())
409+
}
410+
393411
/// Returns a PyDataFrame whose plan corresponds to the SQL statement.
394412
pub fn sql(&mut self, query: &str, py: Python) -> PyDataFusionResult<PyDataFrame> {
395413
let result = self.ctx.sql(query);

src/expr.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ use datafusion::arrow::pyarrow::PyArrowType;
3030
use datafusion::functions::core::expr_ext::FieldAccessor;
3131
use datafusion::logical_expr::{
3232
col,
33-
expr::{AggregateFunction, InList, InSubquery, ScalarFunction, WindowFunction},
33+
expr::{AggregateFunction, AggregateFunctionParams, InList, InSubquery, ScalarFunction, WindowFunction},
3434
lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast,
3535
};
36-
36+
use datafusion::logical_expr::expr::WindowFunctionParams;
3737
use crate::common::data_type::{DataTypeMap, NullTreatment, PyScalarValue, RexType};
3838
use crate::errors::{
3939
py_runtime_err, py_type_err, py_unsupported_variant_err, PyDataFusionError, PyDataFusionResult,
@@ -394,9 +394,9 @@ impl PyExpr {
394394
| Expr::InSubquery(InSubquery { expr, .. }) => Ok(vec![PyExpr::from(*expr.clone())]),
395395

396396
// Expr variants containing a collection of Expr(s) for operands
397-
Expr::AggregateFunction(AggregateFunction { args, .. })
397+
Expr::AggregateFunction(AggregateFunction { params: AggregateFunctionParams { args, .. }, .. })
398398
| Expr::ScalarFunction(ScalarFunction { args, .. })
399-
| Expr::WindowFunction(WindowFunction { args, .. }) => {
399+
| Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => {
400400
Ok(args.iter().map(|arg| PyExpr::from(arg.clone())).collect())
401401
}
402402

@@ -575,7 +575,7 @@ impl PyExpr {
575575
Expr::AggregateFunction(agg_fn) => {
576576
let window_fn = Expr::WindowFunction(WindowFunction::new(
577577
WindowFunctionDefinition::AggregateUDF(agg_fn.func.clone()),
578-
agg_fn.args.clone(),
578+
agg_fn.params.args.clone(),
579579
));
580580

581581
add_builder_fns_to_window(

src/expr/aggregate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::DataFusionError;
19-
use datafusion::logical_expr::expr::{AggregateFunction, Alias};
19+
use datafusion::logical_expr::expr::{AggregateFunction, AggregateFunctionParams, Alias};
2020
use datafusion::logical_expr::logical_plan::Aggregate;
2121
use datafusion::logical_expr::Expr;
2222
use pyo3::{prelude::*, IntoPyObjectExt};
@@ -126,7 +126,7 @@ impl PyAggregate {
126126
match expr {
127127
// TODO: This Alias logic seems to be returning some strange results that we should investigate
128128
Expr::Alias(Alias { expr, .. }) => self._aggregation_arguments(expr.as_ref()),
129-
Expr::AggregateFunction(AggregateFunction { func: _, args, .. }) => {
129+
Expr::AggregateFunction(AggregateFunction { func: _, params: AggregateFunctionParams { args, .. }}) => {
130130
Ok(args.iter().map(|e| PyExpr::from(e.clone())).collect())
131131
}
132132
_ => Err(py_type_err(

src/expr/aggregate_expr.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl From<AggregateFunction> for PyAggregateFunction {
4040

4141
impl Display for PyAggregateFunction {
4242
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
43-
let args: Vec<String> = self.aggr.args.iter().map(|expr| expr.to_string()).collect();
43+
let args: Vec<String> = self.aggr.params.args.iter().map(|expr| expr.to_string()).collect();
4444
write!(f, "{}({})", self.aggr.func.name(), args.join(", "))
4545
}
4646
}
@@ -54,12 +54,13 @@ impl PyAggregateFunction {
5454

5555
/// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
5656
fn is_distinct(&self) -> bool {
57-
self.aggr.distinct
57+
self.aggr.params.distinct
5858
}
5959

6060
/// Get the arguments to the aggregate function
6161
fn args(&self) -> Vec<PyExpr> {
6262
self.aggr
63+
.params
6364
.args
6465
.iter()
6566
.map(|expr| PyExpr::from(expr.clone()))

src/expr/window.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use datafusion::common::{DataFusionError, ScalarValue};
19-
use datafusion::logical_expr::expr::WindowFunction;
19+
use datafusion::logical_expr::expr::{WindowFunction, WindowFunctionParams};
2020
use datafusion::logical_expr::{Expr, Window, WindowFrame, WindowFrameBound, WindowFrameUnits};
2121
use pyo3::{prelude::*, IntoPyObjectExt};
2222
use std::fmt::{self, Display, Formatter};
@@ -118,15 +118,15 @@ impl PyWindowExpr {
118118
/// Returns order by columns in a window function expression
119119
pub fn get_sort_exprs(&self, expr: PyExpr) -> PyResult<Vec<PySortExpr>> {
120120
match expr.expr.unalias() {
121-
Expr::WindowFunction(WindowFunction { order_by, .. }) => py_sort_expr_list(&order_by),
121+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { order_by, .. }, .. }) => py_sort_expr_list(&order_by),
122122
other => Err(not_window_function_err(other)),
123123
}
124124
}
125125

126126
/// Return partition by columns in a window function expression
127127
pub fn get_partition_exprs(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
128128
match expr.expr.unalias() {
129-
Expr::WindowFunction(WindowFunction { partition_by, .. }) => {
129+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { partition_by, .. }, .. }) => {
130130
py_expr_list(&partition_by)
131131
}
132132
other => Err(not_window_function_err(other)),
@@ -136,7 +136,7 @@ impl PyWindowExpr {
136136
/// Return input args for window function
137137
pub fn get_args(&self, expr: PyExpr) -> PyResult<Vec<PyExpr>> {
138138
match expr.expr.unalias() {
139-
Expr::WindowFunction(WindowFunction { args, .. }) => py_expr_list(&args),
139+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams { args, .. }, .. }) => py_expr_list(&args),
140140
other => Err(not_window_function_err(other)),
141141
}
142142
}
@@ -152,7 +152,7 @@ impl PyWindowExpr {
152152
/// Returns a Pywindow frame for a given window function expression
153153
pub fn get_frame(&self, expr: PyExpr) -> Option<PyWindowFrame> {
154154
match expr.expr.unalias() {
155-
Expr::WindowFunction(WindowFunction { window_frame, .. }) => Some(window_frame.into()),
155+
Expr::WindowFunction(WindowFunction { params: WindowFunctionParams {window_frame, .. }, .. }) => Some(window_frame.into()),
156156
_ => None,
157157
}
158158
}

src/functions.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion::execution::FunctionRegistry;
3636
use datafusion::functions;
3737
use datafusion::functions_aggregate;
3838
use datafusion::functions_window;
39-
use datafusion::logical_expr::expr::Alias;
39+
use datafusion::logical_expr::expr::{Alias, WindowFunctionParams};
4040
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
4141
use datafusion::logical_expr::{expr::WindowFunction, lit, Expr, WindowFunctionDefinition};
4242

@@ -196,10 +196,7 @@ fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
196196
#[pyfunction]
197197
fn col(name: &str) -> PyResult<PyExpr> {
198198
Ok(PyExpr {
199-
expr: datafusion::logical_expr::Expr::Column(Column {
200-
relation: None,
201-
name: name.to_string(),
202-
}),
199+
expr: Expr::Column(Column::new_unqualified(name)),
203200
})
204201
}
205202

@@ -314,19 +311,21 @@ fn window(
314311
Ok(PyExpr {
315312
expr: datafusion::logical_expr::Expr::WindowFunction(WindowFunction {
316313
fun,
317-
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
318-
partition_by: partition_by
319-
.unwrap_or_default()
320-
.into_iter()
321-
.map(|x| x.expr)
322-
.collect::<Vec<_>>(),
323-
order_by: order_by
324-
.unwrap_or_default()
325-
.into_iter()
326-
.map(|x| x.into())
327-
.collect::<Vec<_>>(),
328-
window_frame,
329-
null_treatment: None,
314+
params: WindowFunctionParams {
315+
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
316+
partition_by: partition_by
317+
.unwrap_or_default()
318+
.into_iter()
319+
.map(|x| x.expr)
320+
.collect::<Vec<_>>(),
321+
order_by: order_by
322+
.unwrap_or_default()
323+
.into_iter()
324+
.map(|x| x.into())
325+
.collect::<Vec<_>>(),
326+
window_frame,
327+
null_treatment: None,
328+
},
330329
}),
331330
})
332331
}

0 commit comments

Comments
 (0)