Skip to content

Commit 93bdf79

Browse files
committed
improve writing without partitions
1 parent 5c9e70d commit 93bdf79

File tree

1 file changed

+38
-24
lines changed

1 file changed

+38
-24
lines changed

iceberg-rust/src/arrow/write.rs

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,35 +47,49 @@ pub async fn write_parquet_partitioned(
4747
let schema = metadata.current_schema(branch).map_err(Error::from)?;
4848
let partition_spec = metadata.default_partition_spec().map_err(Error::from)?;
4949

50-
let streams = partition_record_batches(batches, partition_spec, schema).await?;
51-
5250
let arrow_schema: Arc<ArrowSchema> =
5351
Arc::new((schema.fields()).try_into().map_err(Error::from)?);
5452

55-
let (sender, reciever) = unbounded();
53+
let (mut sender, reciever) = unbounded();
5654

57-
stream::iter(streams.into_iter())
58-
.map(Ok::<_, ArrowError>)
59-
.try_for_each_concurrent(None, |(partition_values, batches)| {
60-
let arrow_schema = arrow_schema.clone();
61-
let object_store = object_store.clone();
62-
let mut sender = sender.clone();
63-
async move {
64-
let files = write_parquet_files(
65-
location,
66-
schema,
67-
&arrow_schema,
68-
partition_spec,
69-
&partition_values,
70-
batches,
71-
object_store.clone(),
72-
)
73-
.await?;
74-
sender.send(files).await.map_err(Error::from)?;
75-
Ok(())
76-
}
77-
})
55+
if partition_spec.fields().is_empty() {
56+
let files = write_parquet_files(
57+
location,
58+
schema,
59+
&arrow_schema,
60+
partition_spec,
61+
&Vec::new(),
62+
batches,
63+
object_store.clone(),
64+
)
7865
.await?;
66+
sender.send(files).await.map_err(Error::from)?;
67+
} else {
68+
let streams = partition_record_batches(batches, partition_spec, schema).await?;
69+
70+
stream::iter(streams.into_iter())
71+
.map(Ok::<_, ArrowError>)
72+
.try_for_each_concurrent(None, |(partition_values, batches)| {
73+
let arrow_schema = arrow_schema.clone();
74+
let object_store = object_store.clone();
75+
let mut sender = sender.clone();
76+
async move {
77+
let files = write_parquet_files(
78+
location,
79+
schema,
80+
&arrow_schema,
81+
partition_spec,
82+
&partition_values,
83+
batches,
84+
object_store.clone(),
85+
)
86+
.await?;
87+
sender.send(files).await.map_err(Error::from)?;
88+
Ok(())
89+
}
90+
})
91+
.await?;
92+
}
7993

8094
sender.close_channel();
8195

0 commit comments

Comments
 (0)