Skip to content

Commit b981147

Browse files
authored
Merge pull request #190 from JanKaul/concurrent-datafiles
fetch manifests concurrently
2 parents 5764848 + 9d1f6a0 commit b981147

File tree

1 file changed

+29
-29
lines changed

1 file changed

+29
-29
lines changed

iceberg-rust/src/table/mod.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use std::{io::Cursor, sync::Arc};
1414

1515
use futures::future;
16+
use futures::stream::FuturesUnordered;
1617
use itertools::Itertools;
1718
use manifest::ManifestReader;
1819
use manifest_list::read_snapshot;
@@ -313,9 +314,8 @@ async fn datafiles(
313314
None => Box::new(manifests.iter()),
314315
};
315316

316-
// Collect a vector of data files by creating a stream over the manifst files, fetch their content and return a flatten stream over their entries.
317-
Ok(stream::iter(iter)
318-
.then(move |file| {
317+
let stream: FuturesUnordered<_> = iter
318+
.map(move |file| {
319319
let object_store = object_store.clone();
320320
async move {
321321
let manifest_path = &file.manifest_path;
@@ -329,35 +329,35 @@ async fn datafiles(
329329
Ok::<_, Error>((bytes, manifest_path, file.sequence_number))
330330
}
331331
})
332-
.flat_map_unordered(None, move |result| {
333-
let (bytes, path, sequence_number) = result.unwrap();
332+
.collect();
334333

335-
let reader = ManifestReader::new(bytes).unwrap();
336-
stream::iter(reader).try_filter_map(move |mut x| {
337-
future::ready({
338-
let sequence_number = if let Some(sequence_number) = x.sequence_number() {
339-
*sequence_number
340-
} else {
341-
*x.sequence_number_mut() = Some(sequence_number);
342-
sequence_number
343-
};
334+
Ok(stream.flat_map_unordered(None, move |result| {
335+
let (bytes, path, sequence_number) = result.unwrap();
344336

345-
let filter = match sequence_number_range {
346-
(Some(start), Some(end)) => {
347-
start < sequence_number && sequence_number <= end
348-
}
349-
(Some(start), None) => start < sequence_number,
350-
(None, Some(end)) => sequence_number <= end,
351-
_ => true,
352-
};
353-
if filter {
354-
Ok(Some((path.to_owned(), x)))
355-
} else {
356-
Ok(None)
357-
}
358-
})
337+
let reader = ManifestReader::new(bytes).unwrap();
338+
stream::iter(reader).try_filter_map(move |mut x| {
339+
future::ready({
340+
let sequence_number = if let Some(sequence_number) = x.sequence_number() {
341+
*sequence_number
342+
} else {
343+
*x.sequence_number_mut() = Some(sequence_number);
344+
sequence_number
345+
};
346+
347+
let filter = match sequence_number_range {
348+
(Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
349+
(Some(start), None) => start < sequence_number,
350+
(None, Some(end)) => sequence_number <= end,
351+
_ => true,
352+
};
353+
if filter {
354+
Ok(Some((path.to_owned(), x)))
355+
} else {
356+
Ok(None)
357+
}
359358
})
360-
}))
359+
})
360+
}))
361361
}
362362

363363
/// delete all datafiles, manifests and metadata files, does not remove table from catalog

0 commit comments

Comments
 (0)