Skip to content

feat(optimizer): Add Lance count() pushdown optimization #4969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 15, 2025

Conversation

huleilei
Copy link
Contributor

Changes Made

1. Solution Design

1.1 Problem Background

In Daft, performing a count() operation on a LanceDB data source requires scanning the entire dataset and pulling the data from the storage layer to the computation layer for aggregation, even if it's just to calculate the total number of rows. This approach is inefficient, especially for very large datasets, as it consumes significant network bandwidth and computational resources.

1.2 Solution

To address this issue, we propose an optimization that pushes down the count() aggregation operation to the LanceDB data source. With this solution, Daft can directly leverage LanceDB's metadata or its efficient counting capabilities to obtain the total row count, thus avoiding unnecessary data transfer and computation.
The core idea of this solution is to introduce a new logical optimization rule, PushDownAggregation, which identifies count() aggregation operations that can be pushed down and converts them into specific scan tasks for the LanceDB data source.

2. Code Implementation Logic

2.1 PushDownAggregation Optimization Rule

We implemented the PushDownAggregation optimization rule in the src/daft-logical-plan/src/optimization/rules/push_down_aggregation.rs file. This rule traverses the logical plan tree and matches Aggregate nodes.
The rule pushes down the count() aggregation operation to the data source when all of the following conditions are met:
1. No groupby: The aggregation operation does not have a groupby clause.
2. Single Aggregation: There is only one aggregation expression.
3. count Aggregation: The aggregation expression is count().
4. Physical Data Source: The input to the aggregation operation is a physical data source, not in-memory data or a placeholder.
5. Data Source Support: The data source's scanner supports aggregation pushdown, specifically count pushdown, and the pushdown count mode is supported (currently only CountMode::All is supported).
6. No Filters: There are no filters that have already been pushed down to the data source.
When these conditions are met, the rule creates a new Source node and includes the count() aggregation information as part of the pushdowns. The schema of the new Source node is updated to the post-aggregation schema.

2.2 Modifications to LanceDBScanOperator

The LanceDBScanOperator in the daft/io/lance/lance_scan.py file was modified as follows:
1. Capability Declaration:

  • The can_absorb_aggregation() method returns True, indicating that the scanner can absorb aggregation operations.
  • The supports_count_pushdown() method returns True, indicating that the scanner supports count pushdown.
  • The supported_count_modes() method returns [CountMode.All], indicating that the scanner only supports count(*) style pushdown.
    2. Task Creation:
  • The to_scan_tasks() method was modified to handle count pushdown. When a count aggregation pushdown is detected, it creates a ScanTask that calls _lancedb_count_result_function.
  • The _lancedb_count_result_function function is responsible for calling LanceDB's count_rows() API to perform the count. In scenarios without filters, this function can directly use the metadata of the LanceDB dataset to efficiently return the row count, thus avoiding costly full data scans.

3. Core Benefits

  • Improved Query Performance: Pushing down the count() computation to LanceDB eliminates the overhead of data transfer and upper-layer aggregation, significantly improving the execution speed of counting queries.
  • Optimized Resource Utilization: Significantly reduces the network bandwidth pressure caused by data pulling and saves CPU and memory resources of the computing cluster.

4. Limitations and Future Work

4.1 Current Limitations

  • count Mode Support: Currently, only count(*) (CountMode::All) is supported. CountMode::Valid (non-null count) and CountMode::Null (null count) are not yet supported.
  • Filter Limitation: This optimization does not take effect when the query includes a filter.
  • groupby: Aggregation operations with groupby are not pushed down.
    In the above scenarios, Daft will automatically fall back to the original execution logic, completing the aggregation at the computation layer to ensure the accuracy of the results.

4.2 Future Optimization Directions (TODO)

  • Support Pushdown with Filters: Implement the pushdown of count operations with filter conditions to the data source.
  • Extend count Modes: Support modes like count(column) by converting them into pushdowns with an IS NOT NULL filter.
  • Extend to Other Data Sources: Generalize this optimization to other data sources that support efficient counting, such as Iceberg and Hudi.

5. Performance Test Comparison

Data volume: 5000 pieces, reference for test cases: #4874

Before Optimization

------------------------------------------------------------------------------------------------------------- benchmark: 4 tests ------------------------------------------------------------------------------------------------------------
Name (time in us)                                          Min                    Max                   Mean                StdDev                 Median                   IQR            Outliers         OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_lance_native_read[count-COUNT(1)]                248.4840 (1.0)       1,158.2470 (1.0)         294.7342 (1.0)         60.6290 (1.0)         287.6330 (1.0)         33.7613 (1.0)         13;13  3,392.8873 (1.0)         369           1
test_duckdb_read_lance[count]                       2,875.4600 (11.57)     3,465.7710 (2.99)      3,078.6922 (10.45)      238.5957 (3.94)      2,959.2820 (10.29)      298.4882 (8.84)          1;0    324.8132 (0.10)          5           1
test_daft_dataframe_read_lance[count-COUNT(1)]     70,618.5060 (284.20)   75,446.5320 (65.14)    72,187.9156 (244.93)   1,905.4424 (31.43)    71,878.5300 (249.90)   1,872.3900 (55.46)         1;0     13.8527 (0.00)          5           1
test_daft_sql_read_lance[count-COUNT(1)]           70,713.5820 (284.58)   72,332.8790 (62.45)    71,581.7112 (242.87)     731.3439 (12.06)    71,935.9190 (250.10)   1,271.6818 (37.67)         2;0     13.9700 (0.00)          5           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

After Optimization

--------------------------------------------------------------------------------------------------------- benchmark: 4 tests --------------------------------------------------------------------------------------------------------
Name (time in us)                                         Min                   Max                  Mean              StdDev                Median                 IQR            Outliers         OPS            Rounds  Iterations
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_lance_native_read[count-COUNT(1)]               243.4360 (1.0)      2,890.7980 (1.0)        288.3970 (1.0)       99.7140 (1.0)        282.7590 (1.0)       30.4350 (1.0)         11;25  3,467.4419 (1.0)        1572           1
test_duckdb_read_lance[count]                      3,053.7880 (12.54)    3,503.1090 (1.21)     3,275.2938 (11.36)    175.0964 (1.76)     3,300.6170 (11.67)    264.3005 (8.68)          2;0    305.3161 (0.09)          5           1
test_daft_sql_read_lance[count-COUNT(1)]           6,075.9390 (24.96)    6,461.6360 (2.24)     6,240.9604 (21.64)    146.2203 (1.47)     6,259.2170 (22.14)    182.1147 (5.98)          2;0    160.2317 (0.05)          5           1
test_daft_dataframe_read_lance[count-COUNT(1)]     6,362.0170 (26.13)    8,084.4090 (2.80)     6,890.7954 (23.89)    705.4063 (7.07)     6,671.3280 (23.59)    825.6568 (27.13)         1;0    145.1211 (0.04)          5           1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@huleilei
Copy link
Contributor Author

huleilei commented Aug 13, 2025

@rchowell @universalmind303 @desmondcheongzx @Jay-ju help me review when you are convenient. Thanks

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements a Lance count() pushdown optimization that significantly improves performance for count operations on LanceDB data sources. The optimization pushes count(*) aggregations down to the data source level, allowing LanceDB to use its native metadata or efficient counting APIs instead of transferring and processing entire datasets.

The implementation centers around a new PushDownAggregation optimization rule that identifies eligible count operations and transforms them into scan operations with aggregation pushdowns. The rule applies when specific conditions are met: no groupby clause, single count(*) aggregation, physical data source, scan operator support, and no existing filters. The LanceDBScanOperator has been enhanced with count pushdown capabilities, implementing supports_count_pushdown() and supported_count_modes() methods, and modified to detect count pushdowns in to_scan_tasks() to create specialized count tasks using LanceDB's count_rows() API.

Supporting infrastructure changes include extending the Pushdowns struct with an aggregation field, updating the ScanOperator trait with count pushdown methods, and adding comprehensive Python bindings. The optimization integrates into Daft's existing pushdown framework and maintains backward compatibility through fallback mechanisms. Performance benchmarks show dramatic improvements - 10-12x speedup for Daft operations, reducing execution times from ~70ms to ~6ms for count operations on test datasets.

The feature is currently limited to CountMode::All (count all rows) without filters, with plans to extend support for filtered counts and other count modes in future iterations. The implementation follows established patterns in the Daft codebase for optimization rules and maintains separation between logical optimization and physical execution.

Confidence score: 4/5

  • This PR implements a well-designed optimization with significant performance benefits and proper fallback mechanisms
  • Score reflects comprehensive implementation across multiple layers with good test coverage, but complexity of cross-language integration and new optimization rule merits careful attention
  • Pay close attention to src/daft-logical-plan/src/optimization/rules/push_down_aggregation.rs and daft/io/lance/lance_scan.py for the core optimization logic

21 files reviewed, 4 comments

Edit Code Review Bot Settings | Greptile

huleilei and others added 3 commits August 13, 2025 22:22
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
…tion.rs

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Copy link

codecov bot commented Aug 13, 2025

Codecov Report

❌ Patch coverage is 90.98837% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.21%. Comparing base (072858e) to head (af0aff7).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
daft/io/lance/lance_scan.py 55.88% 15 Missing ⚠️
src/common/scan-info/src/scan_operator.rs 0.00% 6 Missing ⚠️
...rc/daft-logical-plan/src/optimization/optimizer.rs 93.47% 3 Missing ⚠️
src/daft-scan/src/anonymous.rs 0.00% 3 Missing ⚠️
src/common/scan-info/src/python.rs 90.47% 2 Missing ⚠️
...an/src/optimization/rules/push_down_aggregation.rs 99.42% 1 Missing ⚠️
src/daft-scan/src/python.rs 90.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4969      +/-   ##
==========================================
+ Coverage   76.29%   77.21%   +0.91%     
==========================================
  Files         918      919       +1     
  Lines      128703   127236    -1467     
==========================================
+ Hits        98195    98246      +51     
+ Misses      30508    28990    -1518     
Files with missing lines Coverage Δ
daft/io/scan.py 74.28% <100.00%> (+1.55%) ⬆️
src/common/scan-info/src/pushdowns.rs 95.59% <100.00%> (+19.47%) ⬆️
src/common/scan-info/src/test/mod.rs 65.25% <100.00%> (+65.25%) ⬆️
...lan/src/optimization/rules/push_down_projection.rs 95.40% <100.00%> (+<0.01%) ⬆️
...logical-plan/src/optimization/rules/shard_scans.rs 94.44% <100.00%> (+0.10%) ⬆️
...-logical-plan/src/optimization/rules/split_udfs.rs 93.20% <ø> (ø)
src/daft-logical-plan/src/test/mod.rs 100.00% <100.00%> (ø)
src/daft-micropartition/src/micropartition.rs 83.60% <100.00%> (+0.01%) ⬆️
src/daft-physical-plan/src/test/mod.rs 100.00% <100.00%> (ø)
src/daft-scan/src/glob.rs 90.00% <100.00%> (+0.08%) ⬆️
... and 7 more

... and 38 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@universalmind303 universalmind303 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty sound to me. Thanks @huleilei

My biggest question is if we can later extend this functionality to parquet as well. Right now parquet count() does a full scan as seen here #2298. This PR seems to lay a lot of the ground work for whats needed to support this in parquet as well.

@huleilei
Copy link
Contributor Author

This looks pretty sound to me. Thanks @huleilei

My biggest question is if we can later extend this functionality to parquet as well. Right now parquet count() does a full scan as seen here #2298. This PR seems to lay a lot of the ground work for whats needed to support this in parquet as well.

Yes. Other data sources also need to consider optimizing count_row. My next PR will apply this feature to the Parquet file. Thanks

@srilman srilman changed the title feat(optimizer): Add Lance count() pushdown optimizatio feat(optimizer): Add Lance count() pushdown optimization Aug 14, 2025
@srilman srilman merged commit 0d4ba08 into Eventual-Inc:main Aug 15, 2025
52 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants