-
Notifications
You must be signed in to change notification settings - Fork 253
feat(optimizer): Add Lance count() pushdown optimization #4969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@rchowell @universalmind303 @desmondcheongzx @Jay-ju help me review when you are convenient. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements a Lance count() pushdown optimization that significantly improves performance for count operations on LanceDB data sources. The optimization pushes count(*)
aggregations down to the data source level, allowing LanceDB to use its native metadata or efficient counting APIs instead of transferring and processing entire datasets.
The implementation centers around a new PushDownAggregation
optimization rule that identifies eligible count operations and transforms them into scan operations with aggregation pushdowns. The rule applies when specific conditions are met: no groupby clause, single count(*) aggregation, physical data source, scan operator support, and no existing filters. The LanceDBScanOperator
has been enhanced with count pushdown capabilities, implementing supports_count_pushdown()
and supported_count_modes()
methods, and modified to detect count pushdowns in to_scan_tasks()
to create specialized count tasks using LanceDB's count_rows()
API.
Supporting infrastructure changes include extending the Pushdowns
struct with an aggregation
field, updating the ScanOperator
trait with count pushdown methods, and adding comprehensive Python bindings. The optimization integrates into Daft's existing pushdown framework and maintains backward compatibility through fallback mechanisms. Performance benchmarks show dramatic improvements - 10-12x speedup for Daft operations, reducing execution times from ~70ms to ~6ms for count operations on test datasets.
The feature is currently limited to CountMode::All
(count all rows) without filters, with plans to extend support for filtered counts and other count modes in future iterations. The implementation follows established patterns in the Daft codebase for optimization rules and maintains separation between logical optimization and physical execution.
Confidence score: 4/5
- This PR implements a well-designed optimization with significant performance benefits and proper fallback mechanisms
- Score reflects comprehensive implementation across multiple layers with good test coverage, but complexity of cross-language integration and new optimization rule merits careful attention
- Pay close attention to
src/daft-logical-plan/src/optimization/rules/push_down_aggregation.rs
anddaft/io/lance/lance_scan.py
for the core optimization logic
21 files reviewed, 4 comments
src/daft-logical-plan/src/optimization/rules/push_down_aggregation.rs
Outdated
Show resolved
Hide resolved
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
…tion.rs Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4969 +/- ##
==========================================
+ Coverage 76.29% 77.21% +0.91%
==========================================
Files 918 919 +1
Lines 128703 127236 -1467
==========================================
+ Hits 98195 98246 +51
+ Misses 30508 28990 -1518
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Other data sources also need to consider optimizing count_row. My next PR will apply this feature to the Parquet file. Thanks |
Changes Made
1. Solution Design
1.1 Problem Background
In Daft, performing a
count()
operation on a LanceDB data source requires scanning the entire dataset and pulling the data from the storage layer to the computation layer for aggregation, even if it's just to calculate the total number of rows. This approach is inefficient, especially for very large datasets, as it consumes significant network bandwidth and computational resources.1.2 Solution
To address this issue, we propose an optimization that pushes down the
count()
aggregation operation to the LanceDB data source. With this solution, Daft can directly leverage LanceDB's metadata or its efficient counting capabilities to obtain the total row count, thus avoiding unnecessary data transfer and computation.The core idea of this solution is to introduce a new logical optimization rule,
PushDownAggregation
, which identifiescount()
aggregation operations that can be pushed down and converts them into specific scan tasks for the LanceDB data source.2. Code Implementation Logic
2.1
PushDownAggregation
Optimization RuleWe implemented the
PushDownAggregation
optimization rule in thesrc/daft-logical-plan/src/optimization/rules/push_down_aggregation.rs
file. This rule traverses the logical plan tree and matchesAggregate
nodes.The rule pushes down the
count()
aggregation operation to the data source when all of the following conditions are met:1. No
groupby
: The aggregation operation does not have agroupby
clause.2. Single Aggregation: There is only one aggregation expression.
3.
count
Aggregation: The aggregation expression iscount()
.4. Physical Data Source: The input to the aggregation operation is a physical data source, not in-memory data or a placeholder.
5. Data Source Support: The data source's scanner supports aggregation pushdown, specifically
count
pushdown, and the pushdowncount
mode is supported (currently onlyCountMode::All
is supported).6. No Filters: There are no
filters
that have already been pushed down to the data source.When these conditions are met, the rule creates a new
Source
node and includes thecount()
aggregation information as part of thepushdowns
. The schema of the newSource
node is updated to the post-aggregation schema.2.2 Modifications to
LanceDBScanOperator
The
LanceDBScanOperator
in thedaft/io/lance/lance_scan.py
file was modified as follows:1. Capability Declaration:
can_absorb_aggregation()
method returnsTrue
, indicating that the scanner can absorb aggregation operations.supports_count_pushdown()
method returnsTrue
, indicating that the scanner supports count pushdown.supported_count_modes()
method returns[CountMode.All]
, indicating that the scanner only supportscount(*)
style pushdown.2. Task Creation:
to_scan_tasks()
method was modified to handlecount
pushdown. When acount
aggregation pushdown is detected, it creates aScanTask
that calls_lancedb_count_result_function
._lancedb_count_result_function function
is responsible for calling LanceDB'scount_rows()
API to perform the count. In scenarios without filters, this function can directly use the metadata of the LanceDB dataset to efficiently return the row count, thus avoiding costly full data scans.3. Core Benefits
count()
computation to LanceDB eliminates the overhead of data transfer and upper-layer aggregation, significantly improving the execution speed of counting queries.4. Limitations and Future Work
4.1 Current Limitations
count(*) (CountMode::All)
is supported.CountMode::Valid (non-null count)
andCountMode::Null (null count)
are not yet supported.Filter
Limitation: This optimization does not take effect when the query includes afilter
.groupby
: Aggregation operations withgroupby
are not pushed down.In the above scenarios, Daft will automatically fall back to the original execution logic, completing the aggregation at the computation layer to ensure the accuracy of the results.
4.2 Future Optimization Directions (TODO)
count
operations with filter conditions to the data source.count
Modes: Support modes likecount(column)
by converting them into pushdowns with anIS NOT NULL
filter.5. Performance Test Comparison
Data volume: 5000 pieces, reference for test cases: #4874
Before Optimization
After Optimization
Related Issues
Checklist
docs/mkdocs.yml
navigation