Skip to content

feat: Flotilla into partitions #4963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Aug 16, 2025
Merged

feat: Flotilla into partitions #4963

merged 17 commits into from
Aug 16, 2025

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Aug 12, 2025

Changes Made

Implements into_partitions for flotilla.

First collect all input tasks. Then either coalesces inputs or splits them such that exactly num_partitions partitions are emitted from this operator.

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Aug 12, 2025
Copy link

codecov bot commented Aug 13, 2025

Codecov Report

❌ Patch coverage is 31.25000% with 253 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.00%. Comparing base (072858e) to head (9cb1682).
⚠️ Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
...t-distributed/src/pipeline_node/into_partitions.rs 0.00% 210 Missing ⚠️
src/daft-local-plan/src/plan.rs 0.00% 18 Missing ⚠️
...rc/daft-distributed/src/pipeline_node/translate.rs 52.17% 11 Missing ⚠️
.../daft-local-execution/src/sinks/into_partitions.rs 90.69% 8 Missing ⚠️
daft/runners/flotilla.py 57.14% 6 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4963      +/-   ##
==========================================
+ Coverage   76.29%   77.00%   +0.70%     
==========================================
  Files         918      924       +6     
  Lines      128703   127683    -1020     
==========================================
+ Hits        98195    98320     +125     
+ Misses      30508    29363    -1145     
Files with missing lines Coverage Δ
src/daft-distributed/src/pipeline_node/mod.rs 27.63% <ø> (ø)
src/daft-distributed/src/stage/stage_builder.rs 95.45% <100.00%> (-0.06%) ⬇️
src/daft-local-execution/src/pipeline.rs 88.13% <100.00%> (+4.63%) ⬆️
src/daft-writers/src/partition.rs 82.11% <100.00%> (+0.44%) ⬆️
daft/runners/flotilla.py 54.81% <57.14%> (+10.18%) ⬆️
.../daft-local-execution/src/sinks/into_partitions.rs 90.69% <90.69%> (ø)
...rc/daft-distributed/src/pipeline_node/translate.rs 25.11% <52.17%> (-1.15%) ⬇️
src/daft-local-plan/src/plan.rs 69.11% <0.00%> (-1.68%) ⬇️
...t-distributed/src/pipeline_node/into_partitions.rs 0.00% <0.00%> (ø)

... and 78 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@colin-ho colin-ho marked this pull request as ready for review August 14, 2025 18:42
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements the into_partitions operation for Daft's Flotilla distributed execution engine. The feature allows users to redistribute data into a specific number of partitions, providing precise control over partitioning regardless of the input data distribution.

The implementation consists of several key components:

  1. Stage Builder Support: Modified stage_builder.rs to allow IntoPartitions operations to proceed through the distributed stage planning process, removing the previous restriction that forced these operations to fail.

  2. Distributed Pipeline Node: Added a new IntoPartitionsNode in the distributed pipeline system that handles three scenarios:

    • Pass-through: When input partition count matches target
    • Coalescing: When there are too many input partitions, combines multiple tasks into fewer partitions with proper remainder distribution
    • Splitting: When there are too few input partitions, divides tasks using the local execution engine
  3. Local Execution Support: Implemented IntoPartitionsSink as a blocking sink that collects all input data, concatenates it, and redistributes it evenly across the target number of partitions using row-based slicing.

  4. Physical Plan Integration: Added IntoPartitions as a new physical plan node with proper schema handling, statistics state management, and integration with the existing plan infrastructure.

  5. Comprehensive Testing: Added Ray-specific test coverage for splitting, coalescing, chaining operations, and edge cases with empty partitions.

The implementation follows established patterns in the codebase, using the blocking sink pattern for local execution and the distributed pipeline node pattern for flotilla execution. The operation is designed to work specifically with the Ray runner, as evidenced by the test suite being conditionally skipped for non-Ray environments.

Confidence score: 4/5

  • This PR appears safe to merge with thorough implementation across multiple components
  • Score reflects well-structured code following existing patterns but potential memory concerns with large datasets
  • Pay close attention to the blocking sink implementation which concatenates all data in memory

13 files reviewed, no comments

Edit Code Review Bot Settings | Greptile

@colin-ho colin-ho requested review from ohbh and srilman August 14, 2025 18:45
Copy link
Contributor

@ohbh ohbh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good to me! implementation is easy to follow.

added some minor comments about the tests

Comment on lines +43 to +46
def test_into_partitions_split_and_coalesce(make_df) -> None:
data = {"foo": list(range(100))}
df = make_df(data).into_partitions(20).into_partitions(1).collect()
assert df.to_pydict() == data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to implement a drop_into_partitions optimization rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already implemented by the drop_repartition rule.

I know it's weird, but into_partitions uses the LogicalPlan::Repartition plan, and so drop_repartition works for it

@colin-ho colin-ho enabled auto-merge (squash) August 16, 2025 02:26
@colin-ho colin-ho merged commit 10369cb into main Aug 16, 2025
98 of 100 checks passed
@colin-ho colin-ho deleted the colin/flotilla-into-partitions branch August 16, 2025 02:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants