Skip to content

Commit daa4ebd

Browse files
authored
Merge pull request #165 from simonvandel/df-47
WIP: Upgrade to DataFusion 47
2 parents 9d43655 + 96a634d commit daa4ebd

File tree

11 files changed

+534
-357
lines changed

11 files changed

+534
-357
lines changed

Cargo.lock

Lines changed: 282 additions & 155 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,33 +8,36 @@ members = [
88
"datafusion-iceberg-sql",
99
"datafusion_iceberg",
1010
"iceberg-rust",
11-
"iceberg-rust-spec"
11+
"iceberg-rust-spec",
1212
]
1313

1414
resolver = "2"
1515

1616
[workspace.dependencies]
1717
apache-avro = "0.17.0"
18-
arrow = "54"
19-
arrow-schema = "54"
18+
arrow = "55"
19+
arrow-schema = "55"
2020
async-trait = "0.1"
2121
bytes = "1"
22-
chrono = { version = "0.4", default-features = false, features = ["serde", "clock"] }
23-
datafusion = "46"
24-
datafusion-common = "46"
25-
datafusion-execution = "46"
26-
datafusion-expr = "46"
27-
datafusion-functions = { version = "46", features = ["crypto_expressions"] }
28-
datafusion-functions-aggregate = "46"
29-
datafusion-sql = "46"
22+
chrono = { version = "0.4", default-features = false, features = [
23+
"serde",
24+
"clock",
25+
] }
26+
datafusion = "47"
27+
datafusion-common = "47"
28+
datafusion-execution = "47"
29+
datafusion-expr = "47"
30+
datafusion-functions = { version = "47", features = ["crypto_expressions"] }
31+
datafusion-functions-aggregate = "47"
32+
datafusion-sql = "47"
3033
derive-getters = "0.5.0"
3134
derive_builder = "0.20"
3235
futures = "0.3.31"
3336
getrandom = { version = "0.3.1", features = ["std"] }
3437
itertools = "0.14.0"
35-
object_store = { version = "0.11.2", features = ["aws", "gcp"] }
38+
object_store = { version = "0.12", features = ["aws", "gcp"] }
3639
once_map = "0.4"
37-
parquet = { version = "54", features = ["async", "object_store"] }
40+
parquet = { version = "55", features = ["async", "object_store"] }
3841
pin-project-lite = "0.2"
3942
serde = "^1.0"
4043
serde_derive = "^1.0"

datafusion_iceberg/src/materialized_view/delta_queries/delta_node.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,6 @@ impl UserDefinedLogicalNodeCore for PosDeltaNode {
3737
write!(f, "PosDelta")
3838
}
3939

40-
fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
41-
assert_eq!(inputs.len(), 1, "input size inconsistent");
42-
assert_eq!(exprs.len(), 0, "expression size inconsistent");
43-
Self {
44-
input: Arc::new(inputs[0].clone()),
45-
aliases: BTreeMap::new(),
46-
}
47-
}
48-
4940
fn with_exprs_and_inputs(
5041
&self,
5142
exprs: Vec<Expr>,
@@ -117,15 +108,6 @@ impl UserDefinedLogicalNodeCore for NegDeltaNode {
117108
write!(f, "NegDelta")
118109
}
119110

120-
fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self {
121-
assert_eq!(inputs.len(), 1, "input size inconsistent");
122-
assert_eq!(exprs.len(), 0, "expression size inconsistent");
123-
Self {
124-
input: Arc::new(inputs[0].clone()),
125-
aliases: BTreeMap::new(),
126-
}
127-
}
128-
129111
fn with_exprs_and_inputs(
130112
&self,
131113
_exprs: Vec<Expr>,

datafusion_iceberg/src/materialized_view/delta_queries/fork_node.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use core::panic;
21
use std::{
32
fmt::{self, Debug},
43
hash::Hash,
@@ -117,10 +116,6 @@ impl UserDefinedLogicalNodeCore for ForkNode {
117116
write!(f, "ForkNode")
118117
}
119118

120-
fn from_template(&self, _exprs: &[Expr], _inputs: &[LogicalPlan]) -> Self {
121-
panic!("Creating fork node from template is not allowed");
122-
}
123-
124119
fn with_exprs_and_inputs(
125120
&self,
126121
exprs: Vec<Expr>,

datafusion_iceberg/src/materialized_view/delta_queries/transform.rs

Lines changed: 164 additions & 114 deletions
Large diffs are not rendered by default.

datafusion_iceberg/src/planner.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::{fmt::Debug, hash::Hash, sync::Arc};
22

33
use async_trait::async_trait;
44
use datafusion_expr::{
5-
ColumnarValue, CreateCatalogSchema, CreateView, DropCatalogSchema, DropTable, ScalarUDFImpl,
6-
Signature, Volatility,
5+
ColumnarValue, CreateCatalogSchema, CreateView, DropCatalogSchema, DropTable,
6+
ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
77
};
88
use itertools::Itertools;
99
use regex::Regex;
@@ -796,10 +796,11 @@ impl ScalarUDFImpl for RefreshMaterializedView {
796796
Ok(DataType::Utf8)
797797
}
798798

799-
fn invoke(
799+
fn invoke_with_args(
800800
&self,
801-
args: &[datafusion_expr::ColumnarValue],
801+
args: ScalarFunctionArgs,
802802
) -> datafusion::error::Result<datafusion_expr::ColumnarValue> {
803+
let args = args.args;
803804
let ColumnarValue::Scalar(ScalarValue::Utf8(Some(name))) = &args[0] else {
804805
return Err(DataFusionError::Execution(
805806
"Refresh function only takes a scalar string input.".to_string(),
@@ -961,7 +962,7 @@ OPTIONS ('has_header' 'true');";
961962
.expect("Failed to execute query plan.");
962963

963964
ctx.sql(
964-
"INSERT INTO iceberg.public.orders (id, customer_id, product_id, order_date, quantity) VALUES
965+
"INSERT INTO iceberg.public.orders (id, customer_id, product_id, order_date, quantity) VALUES
965966
(1, 1, 1, '2020-01-01', 1),
966967
(2, 2, 1, '2020-01-01', 1),
967968
(3, 3, 1, '2020-01-01', 3),

datafusion_iceberg/src/pruning_statistics.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use datafusion::{
2929
scalar::ScalarValue,
3030
};
3131
use datafusion_expr::{
32-
expr::ScalarFunction, BinaryExpr, ColumnarValue, Expr, ScalarUDF, ScalarUDFImpl, Signature,
33-
TypeSignature, Volatility,
32+
expr::ScalarFunction, BinaryExpr, ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF,
33+
ScalarUDFImpl, Signature, TypeSignature, Volatility,
3434
};
3535
use iceberg_rust::{
3636
arrow::transform::transform_arrow,
@@ -387,7 +387,11 @@ impl ScalarUDFImpl for DateTransform {
387387
Ok(DataType::Int32)
388388
}
389389

390-
fn invoke(&self, args: &[ColumnarValue]) -> datafusion::error::Result<ColumnarValue> {
390+
fn invoke_with_args(
391+
&self,
392+
args: ScalarFunctionArgs,
393+
) -> datafusion::error::Result<ColumnarValue> {
394+
let args = args.args;
391395
let transform = &args[0];
392396
let array = &args[1];
393397
let ColumnarValue::Scalar(ScalarValue::Utf8(Some(transform))) = transform else {

0 commit comments

Comments
 (0)