Skip to content

perf: URL Operators #4871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft

perf: URL Operators #4871

wants to merge 39 commits into from

Conversation

srilman
Copy link
Contributor

@srilman srilman commented Jul 30, 2025

Changes Made

Custom Swordfish operator for URL downloads and uploads that can consume in a streaming sink fashion. Rather than block downloading an entire block, instead we can start connections for a larger block of files and return the first max_connections files that we get back, continuing in the pipeline until we get more.

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@@ -160,6 +160,12 @@ impl ToFromProto for ir::rel::LogicalPlan {
let udf_project = udf_project.to_proto()?.into();
proto::RelVariant::UdfProject(udf_project)
}
Self::UrlDownload(url_download) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't figure out how to serialize the IOConfig, so leaving as a followup. Other opts use the same macro

@Jay-ju
Copy link
Contributor

Jay-ju commented Aug 1, 2025

I'm particularly curious about what prompted the performance optimization for URL handling here—could you elaborate on the core motivation behind this design? @srilman

@srilman
Copy link
Contributor Author

srilman commented Aug 1, 2025

@Jay-ju this is actually an old PR that I wanted to get through to unblock some newer features. Right now, URL downloads and uploads are done somewhat in a blocking way. The idea behind this PR is to get rid of those blocking operations by having dedicated operators that execute in a fully async manner.

In addition (what I'm working on right now), we can collect more & better statistics by having a dedicated operator such as # of failed requests, waiting time, etc.

@srilman srilman changed the title [wip] perf: URL Operators perf: URL Operators Aug 9, 2025
@github-actions github-actions bot added the perf label Aug 9, 2025
@srilman srilman requested a review from colin-ho August 11, 2025 16:49
@srilman srilman marked this pull request as ready for review August 11, 2025 16:49
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This pull request introduces custom Swordfish operators for URL downloads and uploads that implement streaming sink functionality, representing a significant performance optimization over the previous blocking approach. The core innovation allows starting connections for larger blocks of files and returning the first max_connections files that complete, enabling the pipeline to continue processing while additional downloads/uploads complete in the background.

The implementation adds new logical plan operators (UrlDownload and UrlUpload) that integrate with Daft's execution framework through streaming sinks. These operators support configurable connection limits, error handling modes, and passthrough columns for column pruning optimization. The streaming architecture uses async templates with JoinSet for concurrent task management, proper backpressure control through input size buffering, and sophisticated memory management.

Key architectural changes include:

  • New StreamingSinkFinalizeOutput enum that supports multi-stage finalization with HasMoreOutput and Finished variants
  • Async operation template framework in streaming_sink/async_ops/ with reusable patterns for concurrent I/O operations
  • Updated physical and logical plan translation layers to handle URL operations
  • Integration with the distributed pipeline system for scalable execution
  • API simplifications across the codebase, particularly removing error handling from size_bytes() methods that are now infallible

The changes also introduce a new configuration parameter url_ops_bytes_buffer (default 256MB) for memory management during streaming operations and update test files to handle non-deterministic result ordering using sort_pydict utilities. The implementation follows established patterns in the codebase while providing substantial performance improvements for URL-based data operations.

Confidence score: 3/5

  • This PR introduces complex streaming functionality with potential edge cases in async operation management and memory handling that may cause issues under high load or error conditions
  • Score reflects the significant architectural changes involving async operations, complex state management, and distributed execution that increase the risk of subtle bugs
  • Pay close attention to streaming sink implementations, async operation templates, and error handling in URL download/upload operations

89 files reviewed, 14 comments

Edit Code Review Bot Settings | Greptile

Comment on lines +28 to +31
let columns = columns
.iter()
.map(|col| self.columns[col.index].clone())
.collect::<Vec<_>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Variable name collision: the parameter columns is shadowed by the local variable on line 28. Consider renaming the local variable to selected_columns for clarity.

Suggested change
let columns = columns
.iter()
.map(|col| self.columns[col.index].clone())
.collect::<Vec<_>>();
let selected_columns = columns
.iter()
.map(|col| self.columns[col.index].clone())
.collect::<Vec<_>>();

Comment on lines +201 to +213
pub fn select_columns(&self, columns: &[BoundColumn]) -> crate::Result<Self> {
let new_schema = Arc::new(Schema::new(
columns.iter().map(|col| self.schema[col.index].clone()),
));
let columns = columns
.iter()
.map(|col| self.columns[col.index].clone())
.collect::<Vec<_>>();
Ok(Self {
columns,
schema: new_schema,
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Variable name collision: the parameter columns shadows the struct field columns. Consider renaming the parameter to selected_columns or column_refs for clarity.

Suggested change
pub fn select_columns(&self, columns: &[BoundColumn]) -> crate::Result<Self> {
let new_schema = Arc::new(Schema::new(
columns.iter().map(|col| self.schema[col.index].clone()),
));
let columns = columns
.iter()
.map(|col| self.columns[col.index].clone())
.collect::<Vec<_>>();
Ok(Self {
columns,
schema: new_schema,
})
}
pub fn select_columns(&self, selected_columns: &[BoundColumn]) -> crate::Result<Self> {
let new_schema = Arc::new(Schema::new(
selected_columns.iter().map(|col| self.schema[col.index].clone()),
));
let columns = selected_columns
.iter()
.map(|col| self.columns[col.index].clone())
.collect::<Vec<_>>();
Ok(Self {
columns,
schema: new_schema,
})
}

//!
//! On StreamingSink::finalize
//! 1. Poll for finished tasks up to finalize_batch_size()
//! 2. If num_active_tasks() >= 0, return HasMoreOutput to loop again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Logic error: condition should be > 0 not >= 0 since we want to continue when there are active tasks

Suggested change
//! 2. If num_active_tasks() >= 0, return HasMoreOutput to loop again
//! 2. If num_active_tasks() > 0, return HasMoreOutput to loop again

Comment on lines +47 to +49
let size = pickle_dumps(c)
.expect("Failed to pickle Python array for size estimation")
.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Using expect() will cause a panic if pickle serialization fails. Consider if this is the desired behavior - it changes from graceful error handling to a potential crash.

@@ -7,6 +7,7 @@ common-scan-info = {path = "../common/scan-info", default-features = false}
common-treenode = {path = "../common/treenode", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-functions-uri = {path = "../daft-functions-uri", default-features = false}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Missing python feature propagation for daft-functions-uri dependency

Comment on lines +86 to +88
{
panic!("Python feature is required for io_config");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: panic! will crash the program. Consider returning a proper error instead

Comment on lines +136 to 174
if let Expr::ScalarFn(ScalarFn::Builtin(BuiltinScalarFn { udf, inputs })) =
child.as_ref()
&& udf.as_ref().type_id() == TypeId::of::<UrlDownload>()
{
changed = true;

// Split and save child expression
// Child may not have an alias, so we need to generate a new one
// TODO: Remove with ordinals
let child_name = format!("id-{}", uuid::Uuid::new_v4());

let args: UrlDownloadArgs<ExprRef> = inputs.clone().try_into()?;
split_exprs.push(SplitExpr::UrlDownload {
child_name: child_name.clone(),
args,
});

new_children[idx] = resolved_col(child_name);
} else if let Expr::ScalarFn(ScalarFn::Builtin(BuiltinScalarFn {
udf,
inputs,
})) = child.as_ref()
&& udf.as_ref().type_id() == TypeId::of::<UrlUpload>()
{
changed = true;

// Split and save child expression
// Child may not have an alias, so we need to generate a new one
// TODO: Remove with ordinals
let child_name = format!("id-{}", uuid::Uuid::new_v4());
let child = child.alias(child_name.clone());
split_exprs.push(child);

let args: UrlUploadArgs<ExprRef> = inputs.clone().try_into()?;
split_exprs.push(SplitExpr::UrlUpload {
child_name: child_name.clone(),
args,
});

new_children[idx] = resolved_col(child_name);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The pattern matching logic here is quite complex with nested conditionals. Consider extracting this into a helper method to improve readability.

Comment on lines +238 to +267
SplitExpr::UrlDownload { child_name, args } => {
out_names.insert(child_name.clone());
out_exprs.push(resolved_col(child_name.clone()));
let passthrough_columns = last_child
.schema()
.field_names()
.map(name_to_col)
.collect::<Vec<_>>();
last_child = Arc::new(LogicalPlan::UrlDownload(UrlDownloadOp::new(
last_child,
args,
child_name,
passthrough_columns,
)));
}
SplitExpr::UrlUpload { child_name, args } => {
out_names.insert(child_name.clone());
out_exprs.push(resolved_col(child_name.clone()));
let passthrough_columns = last_child
.schema()
.field_names()
.map(name_to_col)
.collect::<Vec<_>>();
last_child = Arc::new(LogicalPlan::UrlUpload(UrlUploadOp::new(
last_child,
args,
child_name,
passthrough_columns,
)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Both UrlDownload and UrlUpload branches have nearly identical logic for creating passthrough columns and plan nodes. Consider extracting a common helper function to reduce duplication.

Comment on lines +72 to +73
actual.explain(show_all=True)
actual = actual.to_pydict()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Debug statement left in test code. Should be removed before merging.

Suggested change
actual.explain(show_all=True)
actual = actual.to_pydict()
actual = actual.to_pydict()

) -> SchemaRef {
let mut fields = passthrough_columns
.iter()
.map(|c| input_schema.get_field(&c.name()).unwrap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: using .unwrap() here could panic if column doesn't exist in schema - consider using proper error handling

Copy link

codspeed-hq bot commented Aug 11, 2025

CodSpeed Performance Report

Merging #4871 will degrade performances by 93.79%

Comparing slade/url-ops (913fa64) with main (00386c9)

Summary

❌ 1 regressions
✅ 23 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark BASE HEAD Change
test_iter_rows_first_row[1 Small File] 125.1 ms 2,015.3 ms -93.79%

@srilman srilman marked this pull request as draft August 11, 2025 18:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants