Skip to content

feat: Propagate morsel size top-down in swordfish #4894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Aug 1, 2025

Changes Made

Propagate morsel sizes down the pipeline nodes.

This PR introduces MorselSizeRequirement, which can be either strict, morsel must be sized to exactly num rows, or flexible, morsel size can be sized up to num rows. This is to optimize for cases where we have known batch sizes, i.e. udfs, and can improve backpressure down the pipes based on this batch size.

MorselSizeRequirements are propagated top down with a couple rules.

  1. A required morsel size overrides the downstream requirement
  2. An operator that has a flexible morsel size requirement will have it's upper bound be the min of the downstream, and of itself.
  3. Blocking sinks will always ignore the downstream requirement and propagate their own requirement.

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Aug 1, 2025
Copy link

codspeed-hq bot commented Aug 1, 2025

CodSpeed Performance Report

Merging #4894 will improve performances by 19.04%

Comparing colin/propagate-morsel-size (caf5052) with main (072858e)

Summary

⚡ 2 improvements
✅ 22 untouched benchmarks

Benchmarks breakdown

Benchmark BASE HEAD Change
test_tpch[1-in-memory-1] 480.8 ms 403.9 ms +19.04%
test_tpch_sql[1-in-memory-1] 479.6 ms 409.8 ms +17.03%

Copy link

codecov bot commented Aug 7, 2025

Codecov Report

❌ Patch coverage is 95.97990% with 8 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.03%. Comparing base (4f14d6a) to head (caf5052).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...intermediate_ops/distributed_actor_pool_project.rs 0.00% 2 Missing ⚠️
src/daft-local-execution/src/run.rs 66.66% 2 Missing ⚠️
...rc/daft-local-execution/src/sinks/blocking_sink.rs 92.00% 2 Missing ⚠️
src/daft-local-execution/src/dispatcher.rs 91.66% 1 Missing ⚠️
...rc/daft-local-execution/src/streaming_sink/base.rs 96.77% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4894      +/-   ##
==========================================
- Coverage   79.20%   77.03%   -2.17%     
==========================================
  Files         917      918       +1     
  Lines      127924   126965     -959     
==========================================
- Hits       101316    97806    -3510     
- Misses      26608    29159    +2551     
Files with missing lines Coverage Δ
...-execution/src/intermediate_ops/intermediate_op.rs 91.54% <100.00%> (+0.93%) ⬆️
...cal-execution/src/intermediate_ops/into_batches.rs 97.50% <100.00%> (+7.50%) ⬆️
...ft-local-execution/src/intermediate_ops/project.rs 100.00% <100.00%> (ø)
...c/daft-local-execution/src/intermediate_ops/udf.rs 87.21% <100.00%> (+6.76%) ⬆️
src/daft-local-execution/src/lib.rs 98.66% <ø> (-0.05%) ⬇️
src/daft-local-execution/src/pipeline.rs 84.18% <100.00%> (+0.68%) ⬆️
src/daft-local-execution/src/runtime_stats/mod.rs 92.77% <ø> (ø)
...aft-local-execution/src/sinks/grouped_aggregate.rs 99.36% <100.00%> (+6.30%) ⬆️
.../daft-local-execution/src/sinks/hash_join_build.rs 93.57% <100.00%> (+9.74%) ⬆️
src/daft-local-execution/src/sinks/write.rs 92.48% <100.00%> (ø)
... and 14 more

... and 94 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@colin-ho colin-ho marked this pull request as ready for review August 8, 2025 16:29
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements a top-down morsel size propagation system in Daft's "swordfish" execution engine. The change allows operators with specific data chunk size requirements (like UDFs with @daft.udf(batch_size=N)) to communicate these requirements upstream through the execution pipeline, enabling more efficient data processing.

Core Changes:

  • New Morsel Size System: Introduces MorselSizeRequirement enum with Strict and Flexible variants, plus combination logic that respects strict requirements while minimizing flexible ones
  • Top-Down Propagation: Adds propagate_morsel_size_requirement() methods to all pipeline node types, allowing requirements to flow from downstream operators to upstream sources
  • Trait Refactoring: Replaces runtime-dependent morsel_size_range() methods with compile-time morsel_size_requirement() methods across all operators
  • Pipeline Integration: Updates the pipeline translation function to automatically propagate morsel size requirements after building the execution tree
  • Source Layer Updates: Extends the Source trait with a chunk_size parameter so data sources can respect downstream chunking requirements
  • Dispatcher Updates: Refactors dispatchers to use MorselSizeRequirement instead of separate bound parameters

How It Works:
When a UDF specifies batch_size=1000, this creates a Strict(1000) requirement that propagates up through intermediate operators (which may add their own flexible requirements) until it reaches scan operations, ensuring data is read in appropriately sized chunks from the source.

Integration with Codebase:
The change fits naturally into Daft's existing pipeline architecture by extending the trait system with morsel size awareness. Operations that fundamentally change data chunking (like aggregations and joins) act as natural barriers to propagation, while pass-through operations properly forward requirements. The system maintains backward compatibility by making morsel size requirements optional throughout.

Confidence score: 4/5

  • This PR contains significant architectural changes that affect the entire execution pipeline, requiring careful validation of the propagation logic
  • Score reflects the comprehensive nature of the changes and potential for subtle issues in the requirement combination logic or missing implementations
  • Pay close attention to the test file and any pipeline nodes that may have been missed in the refactoring

24 files reviewed, 4 comments

Edit Code Review Bot Settings | Greptile

Comment on lines +173 to +176
let chunk_size = match self.morsel_size_requirement {
MorselSizeRequirement::Strict(size) => Some(size),
MorselSizeRequirement::Flexible(size) => Some(size),
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Both Strict and Flexible morsel size requirements are converted to Some(size). Consider if Flexible(0) should map to None instead of Some(0) to indicate no preference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just do the other trick I mentioned. BTW, can you explain this part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking me or greptile

..
}) => {
let parquet_chunk_size = chunk_size_from_config.or(chunk_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Logic reversed - should be chunk_size.or(chunk_size_from_config) to prioritize propagated chunk_size over config

Suggested change
let parquet_chunk_size = chunk_size_from_config.or(chunk_size);
let parquet_chunk_size = chunk_size.or(chunk_size_from_config);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user provided a chunk size we should follow it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, how is this chunk size parameter provided. Is it something that is really hard and the user knows exactly what they are doing. If so, I agree, but if its kind of common then maybe we should think about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In read parquet / csv / json there's a _chunk_size: Optional[int] = None which gets propagated to here.

It's currently 'hidden', idk if it's a common thing that users do though since we don't 'expose' it.

colin-ho and others added 2 commits August 8, 2025 09:37
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@colin-ho colin-ho requested a review from srilman August 8, 2025 16:44
Copy link
Contributor

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, just a couple of clarifying points / questions

| Schema = a#Int64
| Size bytes = 40
| Stats = { Approx num rows = 5, Approx size bytes = 40 B, Accumulated selectivity = 1.00 }
| Morsel Size = Flexible(131072)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than printing Flexible and Strict which requires some context, can do the range syntax, like Strict(X) would be Morsel Size == 0 and Flexible(X) would be 0 <= Morsel Size <= X?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about Batch Size = X for strict, and Batch Size = Range(0, x] for flexible?

also, thoughts on using 'batch' instead of 'morsel'? Thinking we should be more consistent with our terminology (both user facing and internal)

..
}) => {
let parquet_chunk_size = chunk_size_from_config.or(chunk_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, how is this chunk size parameter provided. Is it something that is really hard and the user knows exactly what they are doing. If so, I agree, but if its kind of common then maybe we should think about it?

fn make_state(&self) -> DaftResult<Self::State> {
Ok(())
}
fn morsel_size_requirement(&self) -> Option<MorselSizeRequirement> {
Some(MorselSizeRequirement::Strict(self.batch_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, but I think our final thought that this should be flexible, not strict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap das right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, what do you think about flexible being range(batch_size * 0.8, batch_size).

Really the only potentially weird thing about this setting it as flexible (0 to batch size) is that something like this:

import daft

df = daft.range(10, partitions=10)
df = df.into_batches(10)
for batch in df.to_arrow_iter():
    print(batch)

Intuitively i would think that if i do into_batches(10) on input with 10 rows (regardless of how many partitions), i should get back a single batch of 10 rows.

Or am i overthinking this

Comment on lines +173 to +176
let chunk_size = match self.morsel_size_requirement {
MorselSizeRequirement::Strict(size) => Some(size),
MorselSizeRequirement::Flexible(size) => Some(size),
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just do the other trick I mentioned. BTW, can you explain this part?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants