Skip to content

feat: Flotilla pre-shuffle merge #4873

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Jul 30, 2025

Changes Made

This PR adds pre_shuffle_merge strategy for our shuffles.

For some context, the ray runner supports 3 strategies. "map_reduce", "pre_shuffle_merge", and "flight_shuffle". By default in the legacy ray runner it will default to map_reduce for shuffles of < 200 partitions, and use pre_shuffle_merge for shuffles >= 200.

In flotilla, right now we don't support modifying the strategy, all shuffles are default to the map_reduce behavior. But in this PR we add the ability to do so for pre_shuffle_merge.

What does it do

In a nutshell, pre_shuffle_merge tries to group small partitions into large partitions before executing the repartition operation. In the normal map reduce, a shuffle will introduce M*N intermediate objects. But with pre_shuffle_merge, we can reduce M into a smaller value by coalescing partitions. The default threshodl is 1gb. For instance if we were doing a 10x10 shuffle where we had 10 * 100mb input partitions:

  • normal mapreduce will produce 10*10 = 100 intermediate partitions.
  • pre_shuffle_merge will coalesce the 10 input partitions into 1 input partition of 1gb, then call repartition, thereby producing only 1 * 10 = 10 intermediate partitions.

How it works

As inputs are materialized, we group them by buckets based on the worker id, i.e. where these partitions are located. Then as the buckets hit the threshold, we emit a task with the partitions as input. The repartition node will receive these tasks and add repartition instruction to it.

Grouping is done by worker id for data locality reasons.

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Jul 30, 2025
Copy link

codecov bot commented Jul 30, 2025

Codecov Report

❌ Patch coverage is 87.58170% with 38 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.67%. Comparing base (ddbf50b) to head (3d88a73).

Files with missing lines Patch % Lines
...ed/src/pipeline_node/shuffles/pre_shuffle_merge.rs 75.71% 34 Missing ⚠️
...ed/src/pipeline_node/shuffles/translate_shuffle.rs 95.29% 4 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4873      +/-   ##
==========================================
+ Coverage   78.53%   79.67%   +1.13%     
==========================================
  Files         913      915       +2     
  Lines      129042   126325    -2717     
==========================================
- Hits       101349   100655     -694     
+ Misses      27693    25670    -2023     
Files with missing lines Coverage Δ
...rc/daft-distributed/src/pipeline_node/aggregate.rs 96.93% <100.00%> (+0.18%) ⬆️
...stributed/src/pipeline_node/join/translate_join.rs 96.06% <100.00%> (-0.19%) ⬇️
src/daft-distributed/src/pipeline_node/mod.rs 91.22% <ø> (+1.31%) ⬆️
...t-distributed/src/pipeline_node/shuffles/gather.rs 91.00% <ø> (ø)
...tributed/src/pipeline_node/shuffles/repartition.rs 90.62% <100.00%> (ø)
...rc/daft-distributed/src/pipeline_node/translate.rs 96.12% <100.00%> (-0.45%) ⬇️
...ed/src/pipeline_node/shuffles/translate_shuffle.rs 95.29% <95.29%> (ø)
...ed/src/pipeline_node/shuffles/pre_shuffle_merge.rs 75.71% <75.71%> (ø)

... and 36 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

colin-ho and others added 8 commits July 29, 2025 22:49
- Remove ShuffleExchangeNode indirection and create separate PreShuffleMergeNode and MapReduceNode pipeline nodes with unique node IDs
- Move shuffle strategy determination logic from ShuffleExchangeNode to translator's create_shuffle_nodes method
- Each pipeline node now has its own task context for better tracking and debugging
- Update hash_join.rs to use new create_shuffle_nodes API
- Add Clone derives to PipelineNodeConfig and PipelineNodeContext to support node cloning
- Move transpose_materialized_outputs method to MapReduceNode where it belongs
- Remove unused NaiveFullyMaterializingMapReduce struct
- All tests pass with the simplified architecture

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@colin-ho colin-ho requested review from ohbh and srilman August 12, 2025 01:56
@colin-ho colin-ho marked this pull request as ready for review August 12, 2025 01:57
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements the pre_shuffle_merge strategy for Daft's Flotilla distributed execution engine, bringing parity with the legacy Ray runner's shuffle optimization capabilities. The change introduces a new shuffle algorithm that coalesces small partitions before repartitioning to reduce intermediate object count from M*N to a much smaller number.

Key architectural changes:

  • Module reorganization: Shuffle operations are now organized under a dedicated src/daft-distributed/src/pipeline_node/shuffles/ module containing gather.rs, pre_shuffle_merge.rs, repartition.rs, and translate_shuffle.rs

  • New PreShuffleMergeNode: The core implementation groups materialized outputs by worker ID for data locality, accumulates them until reaching a configurable threshold (default 1GB), then creates merge tasks. This reduces shuffle complexity from M*N intermediate partitions to approximately M/threshold_factor * N

  • Strategy selection: The system now supports four shuffle algorithms: "map_reduce", "pre_shuffle_merge", "flight_shuffle" (not yet implemented), and "auto". The auto mode uses geometric mean heuristics with a 200-partition threshold, matching the legacy Ray runner behavior

  • Integration across pipeline nodes: Hash joins, aggregations, distinct operations, and window functions all use the new gen_shuffle_node method which abstracts shuffle strategy selection

How it works: As input partitions materialize, they're bucketed by worker ID for data locality. When buckets reach the size threshold, tasks are emitted to merge the partitions. The merged outputs then flow to a standard RepartitionNode. For example, a 10x10 shuffle with 100MB input partitions would create 100 intermediate objects with map_reduce, but only 10 with pre_shuffle_merge (after coalescing 10 inputs into 1 merged partition).

The implementation includes comprehensive test coverage with scenarios for small partitions (below threshold), large partitions (above threshold), and randomly sized partitions to validate the merge behavior across different data characteristics.

Confidence score: 4/5

  • This PR appears safe to merge with moderate risk due to the complexity of distributed shuffle operations
  • Score reflects the substantial architectural changes in critical shuffle logic, though the implementation follows established patterns and includes good test coverage
  • Pay close attention to src/daft-distributed/src/pipeline_node/shuffles/pre_shuffle_merge.rs and translate_shuffle.rs for the core merge logic and strategy selection

10 files reviewed, 1 comment

Edit Code Review Bot Settings | Greptile

use daft_physical_plan::extract_agg_expr;

use crate::{
pipeline_node::{
concat::ConcatNode, distinct::DistinctNode, explode::ExplodeNode, filter::FilterNode,
gather::GatherNode, in_memory_source::InMemorySourceNode, limit::LimitNode,
in_memory_source::InMemorySourceNode, limit::LimitNode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The gather::GatherNode import was removed but gen_gather_node is still being called on lines 388 and 425

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved GatherNode to the shuffles dir

@colin-ho
Copy link
Contributor Author

colin-ho commented Aug 12, 2025

Ran tpch sf 1000 (minus the sorts) for fun. (4 x i7i.4xlarge)

Question Ray Runner (Main) Flotilla (PR) Flotilla (Main)
Question 1 34.92 25.79 42.65
Question 2 33.39 42.05 508.79
Question 3 41.84 72.18 274.75
Question 4 27.60 45.12 116.64
Question 5 114.84 148.15 323.26
Question 6 11.39 8.51 8.94
Question 7 53.20 82.66 329.53
Question 8 223.79 130.71 382.23
Question 9 272.63 253.70 573.43
Question 10 47.42 91.53 346.14
Question 11 31.07 28.66 191.70
Question 12 33.35 50.65 130.09
Question 13 32.74 38.56 112.24
Question 14 17.49 22.75 81.76
Question 15 31.16 30.74 54.57
Question 16 19.23 23.85 220.99
Question 17 143.96 185.79 289.71
Question 18 92.18 128.36 349.87
Question 19 32.50 32.88 95.93
Question 20 55.52 47.07 271.88
Question 21 170.89 263.08 534.81
Question 22 11.48 18.53 86.78

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant