-
Notifications
You must be signed in to change notification settings - Fork 253
feat: Flotilla into partitions #4963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4963 +/- ##
==========================================
+ Coverage 76.29% 77.00% +0.70%
==========================================
Files 918 924 +6
Lines 128703 127683 -1020
==========================================
+ Hits 98195 98320 +125
+ Misses 30508 29363 -1145
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements the into_partitions
operation for Daft's Flotilla distributed execution engine. The feature allows users to redistribute data into a specific number of partitions, providing precise control over partitioning regardless of the input data distribution.
The implementation consists of several key components:
-
Stage Builder Support: Modified
stage_builder.rs
to allowIntoPartitions
operations to proceed through the distributed stage planning process, removing the previous restriction that forced these operations to fail. -
Distributed Pipeline Node: Added a new
IntoPartitionsNode
in the distributed pipeline system that handles three scenarios:- Pass-through: When input partition count matches target
- Coalescing: When there are too many input partitions, combines multiple tasks into fewer partitions with proper remainder distribution
- Splitting: When there are too few input partitions, divides tasks using the local execution engine
-
Local Execution Support: Implemented
IntoPartitionsSink
as a blocking sink that collects all input data, concatenates it, and redistributes it evenly across the target number of partitions using row-based slicing. -
Physical Plan Integration: Added
IntoPartitions
as a new physical plan node with proper schema handling, statistics state management, and integration with the existing plan infrastructure. -
Comprehensive Testing: Added Ray-specific test coverage for splitting, coalescing, chaining operations, and edge cases with empty partitions.
The implementation follows established patterns in the codebase, using the blocking sink pattern for local execution and the distributed pipeline node pattern for flotilla execution. The operation is designed to work specifically with the Ray runner, as evidenced by the test suite being conditionally skipped for non-Ray environments.
Confidence score: 4/5
- This PR appears safe to merge with thorough implementation across multiple components
- Score reflects well-structured code following existing patterns but potential memory concerns with large datasets
- Pay close attention to the blocking sink implementation which concatenates all data in memory
13 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good to me! implementation is easy to follow.
added some minor comments about the tests
def test_into_partitions_split_and_coalesce(make_df) -> None: | ||
data = {"foo": list(range(100))} | ||
df = make_df(data).into_partitions(20).into_partitions(1).collect() | ||
assert df.to_pydict() == data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make sense to implement a drop_into_partitions
optimization rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already implemented by the drop_repartition
rule.
I know it's weird, but into_partitions
uses the LogicalPlan::Repartition
plan, and so drop_repartition
works for it
Changes Made
Implements
into_partitions
for flotilla.First collect all input tasks. Then either coalesces inputs or splits them such that exactly
num_partitions
partitions are emitted from this operator.Related Issues
Checklist
docs/mkdocs.yml
navigation