-
Notifications
You must be signed in to change notification settings - Fork 253
feat: Propagate morsel size top-down in swordfish #4894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #4894 will improve performances by 19.04%Comparing Summary
Benchmarks breakdown
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4894 +/- ##
==========================================
- Coverage 79.20% 77.03% -2.17%
==========================================
Files 917 918 +1
Lines 127924 126965 -959
==========================================
- Hits 101316 97806 -3510
- Misses 26608 29159 +2551
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements a top-down morsel size propagation system in Daft's "swordfish" execution engine. The change allows operators with specific data chunk size requirements (like UDFs with @daft.udf(batch_size=N)
) to communicate these requirements upstream through the execution pipeline, enabling more efficient data processing.
Core Changes:
- New Morsel Size System: Introduces
MorselSizeRequirement
enum withStrict
andFlexible
variants, plus combination logic that respects strict requirements while minimizing flexible ones - Top-Down Propagation: Adds
propagate_morsel_size_requirement()
methods to all pipeline node types, allowing requirements to flow from downstream operators to upstream sources - Trait Refactoring: Replaces runtime-dependent
morsel_size_range()
methods with compile-timemorsel_size_requirement()
methods across all operators - Pipeline Integration: Updates the pipeline translation function to automatically propagate morsel size requirements after building the execution tree
- Source Layer Updates: Extends the
Source
trait with achunk_size
parameter so data sources can respect downstream chunking requirements - Dispatcher Updates: Refactors dispatchers to use
MorselSizeRequirement
instead of separate bound parameters
How It Works:
When a UDF specifies batch_size=1000
, this creates a Strict(1000)
requirement that propagates up through intermediate operators (which may add their own flexible requirements) until it reaches scan operations, ensuring data is read in appropriately sized chunks from the source.
Integration with Codebase:
The change fits naturally into Daft's existing pipeline architecture by extending the trait system with morsel size awareness. Operations that fundamentally change data chunking (like aggregations and joins) act as natural barriers to propagation, while pass-through operations properly forward requirements. The system maintains backward compatibility by making morsel size requirements optional throughout.
Confidence score: 4/5
- This PR contains significant architectural changes that affect the entire execution pipeline, requiring careful validation of the propagation logic
- Score reflects the comprehensive nature of the changes and potential for subtle issues in the requirement combination logic or missing implementations
- Pay close attention to the test file and any pipeline nodes that may have been missed in the refactoring
24 files reviewed, 4 comments
let chunk_size = match self.morsel_size_requirement { | ||
MorselSizeRequirement::Strict(size) => Some(size), | ||
MorselSizeRequirement::Flexible(size) => Some(size), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Both Strict and Flexible morsel size requirements are converted to Some(size). Consider if Flexible(0) should map to None instead of Some(0) to indicate no preference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, just do the other trick I mentioned. BTW, can you explain this part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you asking me or greptile
.. | ||
}) => { | ||
let parquet_chunk_size = chunk_size_from_config.or(chunk_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Logic reversed - should be chunk_size.or(chunk_size_from_config)
to prioritize propagated chunk_size over config
let parquet_chunk_size = chunk_size_from_config.or(chunk_size); | |
let parquet_chunk_size = chunk_size.or(chunk_size_from_config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user provided a chunk size we should follow it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, how is this chunk size parameter provided. Is it something that is really hard and the user knows exactly what they are doing. If so, I agree, but if its kind of common then maybe we should think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In read parquet / csv / json there's a _chunk_size: Optional[int] = None
which gets propagated to here.
It's currently 'hidden', idk if it's a common thing that users do though since we don't 'expose' it.
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, just a couple of clarifying points / questions
tests/dataframe/test_morsels.py
Outdated
| Schema = a#Int64 | ||
| Size bytes = 40 | ||
| Stats = { Approx num rows = 5, Approx size bytes = 40 B, Accumulated selectivity = 1.00 } | ||
| Morsel Size = Flexible(131072) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than printing Flexible and Strict which requires some context, can do the range syntax, like Strict(X)
would be Morsel Size == 0
and Flexible(X)
would be 0 <= Morsel Size <= X
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about Batch Size = X
for strict, and Batch Size = Range(0, x]
for flexible?
also, thoughts on using 'batch' instead of 'morsel'? Thinking we should be more consistent with our terminology (both user facing and internal)
.. | ||
}) => { | ||
let parquet_chunk_size = chunk_size_from_config.or(chunk_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, how is this chunk size parameter provided. Is it something that is really hard and the user knows exactly what they are doing. If so, I agree, but if its kind of common then maybe we should think about it?
fn make_state(&self) -> DaftResult<Self::State> { | ||
Ok(()) | ||
} | ||
fn morsel_size_requirement(&self) -> Option<MorselSizeRequirement> { | ||
Some(MorselSizeRequirement::Strict(self.batch_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm wrong, but I think our final thought that this should be flexible, not strict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap das right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, what do you think about flexible being range(batch_size * 0.8, batch_size)
.
Really the only potentially weird thing about this setting it as flexible (0 to batch size) is that something like this:
import daft
df = daft.range(10, partitions=10)
df = df.into_batches(10)
for batch in df.to_arrow_iter():
print(batch)
Intuitively i would think that if i do into_batches(10) on input with 10 rows (regardless of how many partitions), i should get back a single batch of 10 rows.
Or am i overthinking this
let chunk_size = match self.morsel_size_requirement { | ||
MorselSizeRequirement::Strict(size) => Some(size), | ||
MorselSizeRequirement::Flexible(size) => Some(size), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, just do the other trick I mentioned. BTW, can you explain this part?
Changes Made
Propagate morsel sizes down the pipeline nodes.
This PR introduces
MorselSizeRequirement
, which can be either strict, morsel must be sized to exactly num rows, or flexible, morsel size can be sized up to num rows. This is to optimize for cases where we have known batch sizes, i.e. udfs, and can improve backpressure down the pipes based on this batch size.MorselSizeRequirements are propagated top down with a couple rules.
Related Issues
Checklist
docs/mkdocs.yml
navigation