-
Notifications
You must be signed in to change notification settings - Fork 253
perf: URL Operators #4871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
perf: URL Operators #4871
Conversation
@@ -160,6 +160,12 @@ impl ToFromProto for ir::rel::LogicalPlan { | |||
let udf_project = udf_project.to_proto()?.into(); | |||
proto::RelVariant::UdfProject(udf_project) | |||
} | |||
Self::UrlDownload(url_download) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't figure out how to serialize the IOConfig, so leaving as a followup. Other opts use the same macro
I'm particularly curious about what prompted the performance optimization for URL handling here—could you elaborate on the core motivation behind this design? @srilman |
@Jay-ju this is actually an old PR that I wanted to get through to unblock some newer features. Right now, URL downloads and uploads are done somewhat in a blocking way. The idea behind this PR is to get rid of those blocking operations by having dedicated operators that execute in a fully async manner. In addition (what I'm working on right now), we can collect more & better statistics by having a dedicated operator such as # of failed requests, waiting time, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This pull request introduces custom Swordfish operators for URL downloads and uploads that implement streaming sink functionality, representing a significant performance optimization over the previous blocking approach. The core innovation allows starting connections for larger blocks of files and returning the first max_connections
files that complete, enabling the pipeline to continue processing while additional downloads/uploads complete in the background.
The implementation adds new logical plan operators (UrlDownload
and UrlUpload
) that integrate with Daft's execution framework through streaming sinks. These operators support configurable connection limits, error handling modes, and passthrough columns for column pruning optimization. The streaming architecture uses async templates with JoinSet
for concurrent task management, proper backpressure control through input size buffering, and sophisticated memory management.
Key architectural changes include:
- New
StreamingSinkFinalizeOutput
enum that supports multi-stage finalization withHasMoreOutput
andFinished
variants - Async operation template framework in
streaming_sink/async_ops/
with reusable patterns for concurrent I/O operations - Updated physical and logical plan translation layers to handle URL operations
- Integration with the distributed pipeline system for scalable execution
- API simplifications across the codebase, particularly removing error handling from
size_bytes()
methods that are now infallible
The changes also introduce a new configuration parameter url_ops_bytes_buffer
(default 256MB) for memory management during streaming operations and update test files to handle non-deterministic result ordering using sort_pydict
utilities. The implementation follows established patterns in the codebase while providing substantial performance improvements for URL-based data operations.
Confidence score: 3/5
- This PR introduces complex streaming functionality with potential edge cases in async operation management and memory handling that may cause issues under high load or error conditions
- Score reflects the significant architectural changes involving async operations, complex state management, and distributed execution that increase the risk of subtle bugs
- Pay close attention to streaming sink implementations, async operation templates, and error handling in URL download/upload operations
89 files reviewed, 14 comments
let columns = columns | ||
.iter() | ||
.map(|col| self.columns[col.index].clone()) | ||
.collect::<Vec<_>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Variable name collision: the parameter columns
is shadowed by the local variable on line 28. Consider renaming the local variable to selected_columns
for clarity.
let columns = columns | |
.iter() | |
.map(|col| self.columns[col.index].clone()) | |
.collect::<Vec<_>>(); | |
let selected_columns = columns | |
.iter() | |
.map(|col| self.columns[col.index].clone()) | |
.collect::<Vec<_>>(); |
pub fn select_columns(&self, columns: &[BoundColumn]) -> crate::Result<Self> { | ||
let new_schema = Arc::new(Schema::new( | ||
columns.iter().map(|col| self.schema[col.index].clone()), | ||
)); | ||
let columns = columns | ||
.iter() | ||
.map(|col| self.columns[col.index].clone()) | ||
.collect::<Vec<_>>(); | ||
Ok(Self { | ||
columns, | ||
schema: new_schema, | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Variable name collision: the parameter columns
shadows the struct field columns
. Consider renaming the parameter to selected_columns
or column_refs
for clarity.
pub fn select_columns(&self, columns: &[BoundColumn]) -> crate::Result<Self> { | |
let new_schema = Arc::new(Schema::new( | |
columns.iter().map(|col| self.schema[col.index].clone()), | |
)); | |
let columns = columns | |
.iter() | |
.map(|col| self.columns[col.index].clone()) | |
.collect::<Vec<_>>(); | |
Ok(Self { | |
columns, | |
schema: new_schema, | |
}) | |
} | |
pub fn select_columns(&self, selected_columns: &[BoundColumn]) -> crate::Result<Self> { | |
let new_schema = Arc::new(Schema::new( | |
selected_columns.iter().map(|col| self.schema[col.index].clone()), | |
)); | |
let columns = selected_columns | |
.iter() | |
.map(|col| self.columns[col.index].clone()) | |
.collect::<Vec<_>>(); | |
Ok(Self { | |
columns, | |
schema: new_schema, | |
}) | |
} |
//! | ||
//! On StreamingSink::finalize | ||
//! 1. Poll for finished tasks up to finalize_batch_size() | ||
//! 2. If num_active_tasks() >= 0, return HasMoreOutput to loop again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Logic error: condition should be > 0
not >= 0
since we want to continue when there are active tasks
//! 2. If num_active_tasks() >= 0, return HasMoreOutput to loop again | |
//! 2. If num_active_tasks() > 0, return HasMoreOutput to loop again |
let size = pickle_dumps(c) | ||
.expect("Failed to pickle Python array for size estimation") | ||
.len(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Using expect()
will cause a panic if pickle serialization fails. Consider if this is the desired behavior - it changes from graceful error handling to a potential crash.
@@ -7,6 +7,7 @@ common-scan-info = {path = "../common/scan-info", default-features = false} | |||
common-treenode = {path = "../common/treenode", default-features = false} | |||
daft-core = {path = "../daft-core", default-features = false} | |||
daft-dsl = {path = "../daft-dsl", default-features = false} | |||
daft-functions-uri = {path = "../daft-functions-uri", default-features = false} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Missing python feature propagation for daft-functions-uri dependency
{ | ||
panic!("Python feature is required for io_config"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: panic!
will crash the program. Consider returning a proper error instead
if let Expr::ScalarFn(ScalarFn::Builtin(BuiltinScalarFn { udf, inputs })) = | ||
child.as_ref() | ||
&& udf.as_ref().type_id() == TypeId::of::<UrlDownload>() | ||
{ | ||
changed = true; | ||
|
||
// Split and save child expression | ||
// Child may not have an alias, so we need to generate a new one | ||
// TODO: Remove with ordinals | ||
let child_name = format!("id-{}", uuid::Uuid::new_v4()); | ||
|
||
let args: UrlDownloadArgs<ExprRef> = inputs.clone().try_into()?; | ||
split_exprs.push(SplitExpr::UrlDownload { | ||
child_name: child_name.clone(), | ||
args, | ||
}); | ||
|
||
new_children[idx] = resolved_col(child_name); | ||
} else if let Expr::ScalarFn(ScalarFn::Builtin(BuiltinScalarFn { | ||
udf, | ||
inputs, | ||
})) = child.as_ref() | ||
&& udf.as_ref().type_id() == TypeId::of::<UrlUpload>() | ||
{ | ||
changed = true; | ||
|
||
// Split and save child expression | ||
// Child may not have an alias, so we need to generate a new one | ||
// TODO: Remove with ordinals | ||
let child_name = format!("id-{}", uuid::Uuid::new_v4()); | ||
let child = child.alias(child_name.clone()); | ||
split_exprs.push(child); | ||
|
||
let args: UrlUploadArgs<ExprRef> = inputs.clone().try_into()?; | ||
split_exprs.push(SplitExpr::UrlUpload { | ||
child_name: child_name.clone(), | ||
args, | ||
}); | ||
|
||
new_children[idx] = resolved_col(child_name); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: The pattern matching logic here is quite complex with nested conditionals. Consider extracting this into a helper method to improve readability.
SplitExpr::UrlDownload { child_name, args } => { | ||
out_names.insert(child_name.clone()); | ||
out_exprs.push(resolved_col(child_name.clone())); | ||
let passthrough_columns = last_child | ||
.schema() | ||
.field_names() | ||
.map(name_to_col) | ||
.collect::<Vec<_>>(); | ||
last_child = Arc::new(LogicalPlan::UrlDownload(UrlDownloadOp::new( | ||
last_child, | ||
args, | ||
child_name, | ||
passthrough_columns, | ||
))); | ||
} | ||
SplitExpr::UrlUpload { child_name, args } => { | ||
out_names.insert(child_name.clone()); | ||
out_exprs.push(resolved_col(child_name.clone())); | ||
let passthrough_columns = last_child | ||
.schema() | ||
.field_names() | ||
.map(name_to_col) | ||
.collect::<Vec<_>>(); | ||
last_child = Arc::new(LogicalPlan::UrlUpload(UrlUploadOp::new( | ||
last_child, | ||
args, | ||
child_name, | ||
passthrough_columns, | ||
))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Both UrlDownload and UrlUpload branches have nearly identical logic for creating passthrough columns and plan nodes. Consider extracting a common helper function to reduce duplication.
actual.explain(show_all=True) | ||
actual = actual.to_pydict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Debug statement left in test code. Should be removed before merging.
actual.explain(show_all=True) | |
actual = actual.to_pydict() | |
actual = actual.to_pydict() |
) -> SchemaRef { | ||
let mut fields = passthrough_columns | ||
.iter() | ||
.map(|c| input_schema.get_field(&c.name()).unwrap()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: using .unwrap()
here could panic if column doesn't exist in schema - consider using proper error handling
CodSpeed Performance ReportMerging #4871 will degrade performances by 93.79%Comparing Summary
Benchmarks breakdown
|
Changes Made
Custom Swordfish operator for URL downloads and uploads that can consume in a streaming sink fashion. Rather than block downloading an entire block, instead we can start connections for a larger block of files and return the first
max_connections
files that we get back, continuing in the pipeline until we get more.Checklist
docs/mkdocs.yml
navigation