Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f81ec07

Browse files
committedMay 23, 2025·
delta_datafusion: prune Add file actions
1 parent 1ac1044 commit f81ec07

File tree

3 files changed

+158
-43
lines changed

3 files changed

+158
-43
lines changed
 

‎crates/core/src/delta_datafusion/mod.rs

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub mod logical;
101101
pub mod physical;
102102
pub mod planner;
103103

104+
use crate::kernel::transaction::state::AddContainer;
104105
pub use cdf::scan::DeltaCdfTableProvider;
105106

106107
mod schema_adapter;
@@ -562,41 +563,55 @@ impl<'a> DeltaScanBuilder<'a> {
562563
.unwrap()
563564
});
564565

565-
// Perform Pruning of files to scan
566-
let (files, files_scanned, files_pruned) = match self.files {
567-
Some(files) => {
568-
let files = files.to_owned();
569-
let files_scanned = files.len();
570-
(files, files_scanned, 0)
571-
}
572-
None => {
573-
if let Some(predicate) = &logical_filter {
574-
let pruning_predicate =
575-
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
576-
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
577-
let mut files_pruned = 0usize;
578-
let files = self
566+
let (files, files_scanned, files_pruned) = if let Some(predicate) = &logical_filter {
567+
let pruning_predicate =
568+
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
569+
570+
match self.files {
571+
Some(files) => {
572+
let files_vec = files.to_vec();
573+
let container = AddContainer::new(
574+
&files_vec,
575+
&self.snapshot.metadata().partition_columns,
576+
logical_schema.clone(),
577+
);
578+
579+
let keep_mask = pruning_predicate.prune(&container)?;
580+
let total_files = files.len();
581+
582+
let pruned_files: Vec<_> = files
583+
.iter()
584+
.zip(keep_mask)
585+
.filter(|&(_, keep)| keep)
586+
.map(|(action, _)| action.to_owned())
587+
.collect();
588+
589+
let files_kept = pruned_files.len();
590+
(pruned_files, files_kept, total_files - files_kept)
591+
}
592+
None => {
593+
let keep_mask = pruning_predicate.prune(self.snapshot)?;
594+
let total_files = keep_mask.len();
595+
596+
let pruned_files: Vec<_> = self
579597
.snapshot
580598
.file_actions_iter()?
581-
.zip(files_to_prune.into_iter())
582-
.filter_map(|(action, keep)| {
583-
if keep {
584-
Some(action.to_owned())
585-
} else {
586-
files_pruned += 1;
587-
None
588-
}
589-
})
590-
.collect::<Vec<_>>();
599+
.zip(keep_mask)
600+
.filter(|&(_, keep)| keep)
601+
.map(|(action, _)| action.to_owned())
602+
.collect();
591603

592-
let files_scanned = files.len();
593-
(files, files_scanned, files_pruned)
594-
} else {
595-
let files = self.snapshot.file_actions()?;
596-
let files_scanned = files.len();
597-
(files, files_scanned, 0)
604+
let files_kept = pruned_files.len();
605+
(pruned_files, files_kept, total_files - files_kept)
598606
}
599607
}
608+
} else {
609+
let files = match self.files {
610+
Some(f) => f.to_vec(),
611+
None => self.snapshot.file_actions()?,
612+
};
613+
let files_scanned = files.len();
614+
(files, files_scanned, 0)
600615
};
601616

602617
// TODO we group files together by their partition values. If the table is partitioned
@@ -2282,15 +2297,15 @@ mod tests {
22822297

22832298
let df = ctx.sql("select * from test").await.unwrap();
22842299
let actual = df.collect().await.unwrap();
2285-
let expected = vec! [
2286-
"+----+----+----+-------------------------------------------------------------------------------+",
2287-
"| c3 | c1 | c2 | file_source |",
2288-
"+----+----+----+-------------------------------------------------------------------------------+",
2289-
"| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
2290-
"| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
2291-
"| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
2292-
"+----+----+----+-------------------------------------------------------------------------------+",
2293-
];
2300+
let expected = vec![
2301+
"+----+----+----+-------------------------------------------------------------------------------+",
2302+
"| c3 | c1 | c2 | file_source |",
2303+
"+----+----+----+-------------------------------------------------------------------------------+",
2304+
"| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
2305+
"| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
2306+
"| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
2307+
"+----+----+----+-------------------------------------------------------------------------------+",
2308+
];
22942309
assert_batches_sorted_eq!(&expected, &actual);
22952310
}
22962311

‎crates/core/src/kernel/transaction/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub(crate) mod application;
109109
mod conflict_checker;
110110
mod protocol;
111111
#[cfg(feature = "datafusion")]
112-
mod state;
112+
pub(crate) mod state;
113113

114114
const DELTA_LOG_FOLDER: &str = "_delta_log";
115115
pub(crate) const DEFAULT_RETRIES: usize = 15;

‎crates/core/tests/integration_datafusion.rs

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion_proto::bytes::{
2424
logical_plan_from_bytes_with_extension_codec, logical_plan_to_bytes_with_extension_codec,
2525
};
2626
use deltalake_core::delta_datafusion::DeltaScan;
27-
use deltalake_core::kernel::{DataType, MapType, PrimitiveType, StructField, StructType};
27+
use deltalake_core::kernel::{Add, DataType, MapType, PrimitiveType, StructField, StructType};
2828
use deltalake_core::operations::create::CreateBuilder;
2929
use deltalake_core::protocol::SaveMode;
3030
use deltalake_core::writer::{DeltaWriter, RecordBatchWriter};
@@ -429,8 +429,6 @@ mod local {
429429
let _result = collect(plan.execute(0, task_ctx)?).await?;
430430
visit_execution_plan(&plan, &mut metrics).unwrap();
431431
} else {
432-
// if scan produces no output from ParquetExec, we still want to visit DeltaScan
433-
// to check its metrics
434432
visit_execution_plan(scan.as_ref(), &mut metrics).unwrap();
435433
}
436434

@@ -859,6 +857,108 @@ mod local {
859857
Ok(())
860858
}
861859

860+
async fn get_scan_metrics_with_files(
861+
table: &DeltaTable,
862+
state: &SessionState,
863+
files: Option<Vec<Add>>,
864+
e: &[Expr],
865+
) -> Result<ExecutionMetricsCollector> {
866+
use deltalake_core::delta_datafusion::{DeltaScanConfig, DeltaTableProvider};
867+
868+
let mut provider = DeltaTableProvider::try_new(
869+
table.snapshot().unwrap().clone(),
870+
table.log_store(),
871+
DeltaScanConfig::default(),
872+
)?;
873+
874+
if let Some(f) = files {
875+
provider = provider.with_files(f);
876+
}
877+
878+
let mut metrics = ExecutionMetricsCollector::default();
879+
let scan = provider.scan(state, None, e, None).await?;
880+
881+
if scan.properties().output_partitioning().partition_count() > 0 {
882+
let plan = CoalescePartitionsExec::new(scan);
883+
let task_ctx = Arc::new(TaskContext::from(state));
884+
let _result = collect(plan.execute(0, task_ctx)?).await?;
885+
visit_execution_plan(&plan, &mut metrics).unwrap();
886+
} else {
887+
visit_execution_plan(scan.as_ref(), &mut metrics).unwrap();
888+
}
889+
890+
Ok(metrics)
891+
}
892+
893+
#[tokio::test]
894+
async fn test_files_scanned_with_files() -> Result<()> {
895+
use datafusion::prelude::*;
896+
let ctx = SessionContext::new();
897+
let state = ctx.state();
898+
899+
let batch1 = create_all_types_batch(3, 0, 0); // values 0-2
900+
let batch2 = create_all_types_batch(3, 0, 4); // values 4-6
901+
let batch3 = create_all_types_batch(3, 0, 7); // values 7-9
902+
903+
let (_tmp, mut table) = prepare_table(vec![batch1], SaveMode::Overwrite, vec![]).await;
904+
let files_before_1 = table.snapshot().unwrap().file_actions().unwrap();
905+
906+
table = DeltaOps(table)
907+
.write(vec![batch2])
908+
.with_save_mode(SaveMode::Append)
909+
.await
910+
.unwrap();
911+
let files_before_2 = table.snapshot().unwrap().file_actions().unwrap();
912+
913+
table = DeltaOps(table)
914+
.write(vec![batch3])
915+
.with_save_mode(SaveMode::Append)
916+
.await
917+
.unwrap();
918+
let all_files = table.snapshot().unwrap().file_actions().unwrap();
919+
920+
assert_eq!(all_files.len(), 3);
921+
922+
let file_0_2 = files_before_1[0].clone();
923+
let file_4_6 = files_before_2
924+
.iter()
925+
.find(|f| !files_before_1.iter().any(|f1| f1.path == f.path))
926+
.unwrap()
927+
.clone();
928+
let file_7_9 = all_files
929+
.iter()
930+
.find(|f| !files_before_2.iter().any(|f2| f2.path == f.path))
931+
.unwrap()
932+
.clone();
933+
934+
// Test without with_files (normal snapshot pruning)
935+
let e = col("int64").eq(lit(5i64));
936+
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
937+
assert_eq!(metrics.num_scanned_files(), 1);
938+
assert_eq!(metrics.skip_count, 2);
939+
940+
// Test with with_files providing all files (should behave the same)
941+
let e = col("int64").eq(lit(5i64));
942+
let metrics =
943+
get_scan_metrics_with_files(&table, &state, Some(all_files.clone()), &[e]).await?;
944+
assert_eq!(metrics.num_scanned_files(), 1);
945+
assert_eq!(metrics.skip_count, 2);
946+
947+
let subset_files = vec![file_0_2.clone(), file_7_9.clone()];
948+
let e = col("int64").gt(lit(6i64));
949+
let metrics = get_scan_metrics_with_files(&table, &state, Some(subset_files), &[e]).await?;
950+
assert_eq!(metrics.num_scanned_files(), 1);
951+
assert_eq!(metrics.skip_count, 1);
952+
953+
let subset_files = vec![file_0_2.clone(), file_4_6.clone()];
954+
let e = col("int64").gt(lit(6i64));
955+
let metrics = get_scan_metrics_with_files(&table, &state, Some(subset_files), &[e]).await?;
956+
assert_eq!(metrics.num_scanned_files(), 0);
957+
assert_eq!(metrics.skip_count, 2);
958+
959+
Ok(())
960+
}
961+
862962
#[tokio::test]
863963
async fn test_datafusion_partitioned_types() -> Result<()> {
864964
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)
Please sign in to comment.