Skip to content

Commit 2e81de1

Browse files
committed
fix partition struct field names
1 parent c65e62f commit 2e81de1

File tree

5 files changed

+35
-78
lines changed

5 files changed

+35
-78
lines changed

.github/workflows/rust.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ jobs:
2323
- name: Run iceberg-rust tests
2424
run: cargo test -p iceberg-rust --lib --verbose
2525
- name: Run datafusion-iceberg tests
26-
run: cargo test -p datafusion_iceberg --lib --verbose -j 1
26+
run: cargo test -p datafusion_iceberg --tests --verbose -j 2

iceberg-rust-spec/src/spec/manifest.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ pub fn partition_value_schema(
498498
{
499499
"name": ""#
500500
.to_owned()
501-
+ &schema_field.name
501+
+ field.name()
502502
+ r#"",
503503
"type": ["null",""#
504504
+ &format!("{}", &data_type)
@@ -1556,7 +1556,7 @@ mod tests {
15561556
content: Content::Data,
15571557
file_path: "/".to_string(),
15581558
file_format: FileFormat::Parquet,
1559-
partition: Struct::from_iter(vec![("date".to_owned(), Some(Value::Int(1)))]),
1559+
partition: Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]),
15601560
record_count: 4,
15611561
file_size_in_bytes: 1200,
15621562
column_sizes: None,
@@ -1684,7 +1684,7 @@ mod tests {
16841684
content: Content::Data,
16851685
file_path: "/".to_string(),
16861686
file_format: FileFormat::Parquet,
1687-
partition: Struct::from_iter(vec![("date".to_owned(), Some(Value::Int(1)))]),
1687+
partition: Struct::from_iter(vec![("day".to_owned(), Some(Value::Int(1)))]),
16881688
record_count: 4,
16891689
file_size_in_bytes: 1200,
16901690
column_sizes: None,

iceberg-rust-spec/src/spec/values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl Struct {
188188
// Get datatype after tranform
189189
let datatype = map
190190
.get(name)
191-
.ok_or(Error::InvalidFormat("schema".to_string()))?;
191+
.ok_or(Error::InvalidFormat("partition_struct".to_string()))?;
192192
// Cast the value to the datatype
193193
let value = field.map(|value| value.cast(datatype)).transpose()?;
194194
Ok((name.clone(), value))

iceberg-rust/src/file_format/parquet.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99

1010
use iceberg_rust_spec::spec::{
1111
manifest::{AvroMap, Content, DataFile, FileFormat},
12-
partition::{PartitionField, Transform},
12+
partition::PartitionField,
1313
schema::Schema,
1414
types::Type,
1515
values::{Struct, Value},
@@ -33,24 +33,18 @@ pub fn parquet_to_datafile(
3333
) -> Result<DataFile, Error> {
3434
let mut partition = partition_spec
3535
.iter()
36-
.map(|x| {
37-
let field = schema
38-
.fields()
39-
.get(*x.source_id() as usize)
40-
.ok_or_else(|| Error::InvalidFormat("partition column in schema".to_string()))?;
41-
Ok((field.name.clone(), None))
42-
})
36+
.map(|x| Ok((x.name().clone(), None)))
4337
.collect::<Result<Struct, Error>>()?;
44-
let transforms = partition_spec
38+
let partition_fields = partition_spec
4539
.iter()
4640
.map(|x| {
4741
let field = schema
4842
.fields()
4943
.get(*x.source_id() as usize)
5044
.ok_or_else(|| Error::InvalidFormat("partition column in schema".to_string()))?;
51-
Ok((field.name.clone(), x.transform().clone()))
45+
Ok((field.name.clone(), x.clone()))
5246
})
53-
.collect::<Result<HashMap<String, Transform>, Error>>()?;
47+
.collect::<Result<HashMap<String, PartitionField>, Error>>()?;
5448
let parquet_schema = Arc::new(SchemaDescriptor::new(from_thrift(&file_metadata.schema)?));
5549

5650
let mut column_sizes = AvroMap(HashMap::new());
@@ -202,17 +196,23 @@ pub fn parquet_to_datafile(
202196
}
203197
}
204198

205-
if let Some(partition_value) = partition.get_mut(column_name) {
206-
if partition_value.is_none() {
207-
let transform = transforms
208-
.get(column_name)
209-
.ok_or_else(|| Error::InvalidFormat("transform".to_string()))?;
210-
let min = Value::try_from_bytes(statistics.min_bytes(), data_type)?
211-
.tranform(transform)?;
212-
let max = Value::try_from_bytes(statistics.max_bytes(), data_type)?
213-
.tranform(transform)?;
214-
if min == max {
215-
*partition_value = Some(min)
199+
if let Some(partition_field) = partition_fields.get(column_name) {
200+
if let Some(partition_value) = partition.get_mut(partition_field.name())
201+
{
202+
if partition_value.is_none() {
203+
let partition_field =
204+
partition_fields.get(column_name).ok_or_else(|| {
205+
Error::InvalidFormat("transform".to_string())
206+
})?;
207+
let min =
208+
Value::try_from_bytes(statistics.min_bytes(), data_type)?
209+
.tranform(partition_field.transform())?;
210+
let max =
211+
Value::try_from_bytes(statistics.max_bytes(), data_type)?
212+
.tranform(partition_field.transform())?;
213+
if min == max {
214+
*partition_value = Some(min)
215+
}
216216
}
217217
}
218218
}

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 8 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use iceberg_rust_spec::spec::{
1717
snapshot::{
1818
generate_snapshot_id, SnapshotBuilder, SnapshotReference, SnapshotRetention, Summary,
1919
},
20-
types::StructField,
2120
values::{Struct, Value},
2221
};
2322
use iceberg_rust_spec::util::strip_prefix;
@@ -134,6 +133,7 @@ impl Operation {
134133
Some(stream::iter(manifest_list_reader).filter_map(|manifest| {
135134
let datafiles = datafiles.clone();
136135
let existing_partitions = existing_partitions.clone();
136+
let partition_spec = partition_spec.clone();
137137
async move {
138138
let manifest = manifest
139139
.map_err(Into::into)
@@ -150,7 +150,6 @@ impl Operation {
150150
summary,
151151
datafiles.keys(),
152152
partition_spec.fields(),
153-
schema,
154153
);
155154
if !partition_values.is_empty() {
156155
for file in &partition_values {
@@ -221,17 +220,6 @@ impl Operation {
221220
},
222221
);
223222

224-
let partition_columns = Arc::new(
225-
partition_spec
226-
.fields()
227-
.iter()
228-
.map(|x| schema.fields().get(*x.source_id() as usize))
229-
.collect::<Option<Vec<_>>>()
230-
.ok_or(Error::InvalidFormat(
231-
"Partition column in schema".to_string(),
232-
))?,
233-
);
234-
235223
match existing_manifest_iter {
236224
Some(existing_manifest_iter) => {
237225
let manifest_iter =
@@ -241,7 +229,6 @@ impl Operation {
241229
.then(|(manifest, files): (ManifestStatus, Vec<Struct>)| {
242230
let object_store = object_store.clone();
243231
let datafiles = datafiles.clone();
244-
let partition_columns = partition_columns.clone();
245232
let branch = branch.clone();
246233
async move {
247234
write_manifest(
@@ -250,7 +237,6 @@ impl Operation {
250237
files,
251238
datafiles,
252239
schema,
253-
&partition_columns,
254240
object_store,
255241
branch,
256242
)
@@ -271,7 +257,6 @@ impl Operation {
271257
.then(|(manifest, files): (ManifestStatus, Vec<Struct>)| {
272258
let object_store = object_store.clone();
273259
let datafiles = datafiles.clone();
274-
let partition_columns = partition_columns.clone();
275260
let branch = branch.clone();
276261
async move {
277262
write_manifest(
@@ -280,7 +265,6 @@ impl Operation {
280265
files,
281266
datafiles,
282267
schema,
283-
&partition_columns,
284268
object_store,
285269
branch,
286270
)
@@ -405,18 +389,6 @@ impl Operation {
405389
(ManifestStatus::New(manifest), vec![partition_value.clone()])
406390
});
407391

408-
let partition_columns = Arc::new(
409-
table_metadata
410-
.default_partition_spec()?
411-
.fields()
412-
.iter()
413-
.map(|x| schema.fields().get(*x.source_id() as usize))
414-
.collect::<Option<Vec<_>>>()
415-
.ok_or(Error::InvalidFormat(
416-
"Partition column in schema".to_string(),
417-
))?,
418-
);
419-
420392
let manifest_list_schema =
421393
ManifestListEntry::schema(&table_metadata.format_version)?;
422394

@@ -429,7 +401,6 @@ impl Operation {
429401
.then(|(manifest, files): (ManifestStatus, Vec<Struct>)| {
430402
let object_store = object_store.clone();
431403
let datafiles = datafiles.clone();
432-
let partition_columns = partition_columns.clone();
433404
let branch = branch.clone();
434405
let schema = &schema;
435406
let old_storage_table_metadata = &table_metadata;
@@ -440,7 +411,6 @@ impl Operation {
440411
files,
441412
datafiles,
442413
schema,
443-
&partition_columns,
444414
object_store,
445415
branch,
446416
)
@@ -556,12 +526,12 @@ pub(crate) async fn write_manifest(
556526
files: Vec<Struct>,
557527
datafiles: Arc<HashMap<Struct, Vec<DataFile>>>,
558528
schema: &Schema,
559-
partition_columns: &[&StructField],
560529
object_store: Arc<dyn ObjectStore>,
561530
branch: Option<String>,
562531
) -> Result<ManifestListEntry, Error> {
532+
let partition_spec = table_metadata.default_partition_spec()?;
563533
let manifest_schema = ManifestEntry::schema(
564-
&partition_value_schema(table_metadata.default_partition_spec()?.fields(), schema)?,
534+
&partition_value_schema(partition_spec.fields(), schema)?,
565535
&table_metadata.format_version,
566536
)?;
567537

@@ -596,8 +566,7 @@ pub(crate) async fn write_manifest(
596566

597567
if manifest.partitions.is_none() {
598568
manifest.partitions = Some(
599-
table_metadata
600-
.default_partition_spec()?
569+
partition_spec
601570
.fields()
602571
.iter()
603572
.map(|_| FieldSummary {
@@ -614,7 +583,7 @@ pub(crate) async fn write_manifest(
614583
update_partitions(
615584
manifest.partitions.as_mut().unwrap(),
616585
datafile.partition(),
617-
partition_columns,
586+
partition_spec.fields(),
618587
)?;
619588

620589
let manifest_entry = ManifestEntry::builder()
@@ -662,13 +631,10 @@ pub(crate) async fn write_manifest(
662631
fn update_partitions(
663632
partitions: &mut [FieldSummary],
664633
partition_values: &Struct,
665-
partition_columns: &[&StructField],
634+
partition_columns: &[PartitionField],
666635
) -> Result<(), Error> {
667636
for (field, summary) in partition_columns.iter().zip(partitions.iter_mut()) {
668-
let value = &partition_values.fields[*partition_values
669-
.lookup
670-
.get(&field.name)
671-
.ok_or_else(|| Error::InvalidFormat("partition value in schema".to_string()))?];
637+
let value = &partition_values.get(field.name()).and_then(|x| x.as_ref());
672638
if let Some(value) = value {
673639
if let Some(lower_bound) = &mut summary.lower_bound {
674640
match (value, lower_bound) {
@@ -770,23 +736,14 @@ fn partition_values_in_bounds<'a>(
770736
partitions: &[FieldSummary],
771737
partition_values: impl Iterator<Item = &'a Struct>,
772738
partition_spec: &[PartitionField],
773-
schema: &Schema,
774739
) -> Vec<Struct> {
775740
partition_values
776741
.filter(|value| {
777742
partition_spec
778743
.iter()
779744
.map(|field| {
780-
let name = &schema
781-
.fields()
782-
.get(*field.source_id() as usize)
783-
.ok_or_else(|| {
784-
Error::InvalidFormat("partition values in schema".to_string())
785-
})
786-
.unwrap()
787-
.name;
788745
value
789-
.get(name)
746+
.get(field.name())
790747
.ok_or_else(|| {
791748
Error::InvalidFormat("partition values in schema".to_string())
792749
})

0 commit comments

Comments
 (0)