-
Notifications
You must be signed in to change notification settings - Fork 253
feat: Flotilla pre-shuffle merge #4873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4873 +/- ##
==========================================
+ Coverage 78.53% 79.67% +1.13%
==========================================
Files 913 915 +2
Lines 129042 126325 -2717
==========================================
- Hits 101349 100655 -694
+ Misses 27693 25670 -2023
🚀 New features to boost your workflow:
|
- Remove ShuffleExchangeNode indirection and create separate PreShuffleMergeNode and MapReduceNode pipeline nodes with unique node IDs - Move shuffle strategy determination logic from ShuffleExchangeNode to translator's create_shuffle_nodes method - Each pipeline node now has its own task context for better tracking and debugging - Update hash_join.rs to use new create_shuffle_nodes API - Add Clone derives to PipelineNodeConfig and PipelineNodeContext to support node cloning - Move transpose_materialized_outputs method to MapReduceNode where it belongs - Remove unused NaiveFullyMaterializingMapReduce struct - All tests pass with the simplified architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements the pre_shuffle_merge
strategy for Daft's Flotilla distributed execution engine, bringing parity with the legacy Ray runner's shuffle optimization capabilities. The change introduces a new shuffle algorithm that coalesces small partitions before repartitioning to reduce intermediate object count from M*N to a much smaller number.
Key architectural changes:
-
Module reorganization: Shuffle operations are now organized under a dedicated
src/daft-distributed/src/pipeline_node/shuffles/
module containinggather.rs
,pre_shuffle_merge.rs
,repartition.rs
, andtranslate_shuffle.rs
-
New PreShuffleMergeNode: The core implementation groups materialized outputs by worker ID for data locality, accumulates them until reaching a configurable threshold (default 1GB), then creates merge tasks. This reduces shuffle complexity from M*N intermediate partitions to approximately M/threshold_factor * N
-
Strategy selection: The system now supports four shuffle algorithms: "map_reduce", "pre_shuffle_merge", "flight_shuffle" (not yet implemented), and "auto". The auto mode uses geometric mean heuristics with a 200-partition threshold, matching the legacy Ray runner behavior
-
Integration across pipeline nodes: Hash joins, aggregations, distinct operations, and window functions all use the new
gen_shuffle_node
method which abstracts shuffle strategy selection
How it works: As input partitions materialize, they're bucketed by worker ID for data locality. When buckets reach the size threshold, tasks are emitted to merge the partitions. The merged outputs then flow to a standard RepartitionNode. For example, a 10x10 shuffle with 100MB input partitions would create 100 intermediate objects with map_reduce, but only 10 with pre_shuffle_merge (after coalescing 10 inputs into 1 merged partition).
The implementation includes comprehensive test coverage with scenarios for small partitions (below threshold), large partitions (above threshold), and randomly sized partitions to validate the merge behavior across different data characteristics.
Confidence score: 4/5
- This PR appears safe to merge with moderate risk due to the complexity of distributed shuffle operations
- Score reflects the substantial architectural changes in critical shuffle logic, though the implementation follows established patterns and includes good test coverage
- Pay close attention to
src/daft-distributed/src/pipeline_node/shuffles/pre_shuffle_merge.rs
andtranslate_shuffle.rs
for the core merge logic and strategy selection
10 files reviewed, 1 comment
use daft_physical_plan::extract_agg_expr; | ||
|
||
use crate::{ | ||
pipeline_node::{ | ||
concat::ConcatNode, distinct::DistinctNode, explode::ExplodeNode, filter::FilterNode, | ||
gather::GatherNode, in_memory_source::InMemorySourceNode, limit::LimitNode, | ||
in_memory_source::InMemorySourceNode, limit::LimitNode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The gather::GatherNode import was removed but gen_gather_node
is still being called on lines 388 and 425
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved GatherNode to the shuffles dir
Ran tpch sf 1000 (minus the sorts) for fun. (4 x i7i.4xlarge)
|
Changes Made
This PR adds
pre_shuffle_merge
strategy for our shuffles.For some context, the ray runner supports 3 strategies. "map_reduce", "pre_shuffle_merge", and "flight_shuffle". By default in the legacy ray runner it will default to map_reduce for shuffles of < 200 partitions, and use pre_shuffle_merge for shuffles >= 200.
In flotilla, right now we don't support modifying the strategy, all shuffles are default to the map_reduce behavior. But in this PR we add the ability to do so for
pre_shuffle_merge
.What does it do
In a nutshell, pre_shuffle_merge tries to group small partitions into large partitions before executing the repartition operation. In the normal map reduce, a shuffle will introduce M*N intermediate objects. But with pre_shuffle_merge, we can reduce M into a smaller value by coalescing partitions. The default threshodl is 1gb. For instance if we were doing a 10x10 shuffle where we had 10 * 100mb input partitions:
How it works
As inputs are materialized, we group them by buckets based on the worker id, i.e. where these partitions are located. Then as the buckets hit the threshold, we emit a task with the partitions as input. The repartition node will receive these tasks and add repartition instruction to it.
Grouping is done by worker id for data locality reasons.
Related Issues
Checklist
docs/mkdocs.yml
navigation