-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52759][SDP][SQL] Throw exception if pipeline has no tables or persisted views #51445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-52759][SDP][SQL] Throw exception if pipeline has no tables or persisted views #51445
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's close! Just small comments, and let's also undo any formatting changes to existing code so that:
- The diff is smaller, making it easier for PR reviewers to understand what exactly is changing
- We don't pollute the git blame
If you feel like we should reformat some of the code, let's open a separate PR to do that!
...nnect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
Outdated
Show resolved
Hide resolved
...pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
Outdated
Show resolved
Hide resolved
@@ -50,16 +50,21 @@ class GraphRegistrationContext( | |||
} | |||
|
|||
def toDataflowGraph: DataflowGraph = { | |||
// throw exception if pipeline does not have table or persisted view | |||
if (tables.isEmpty && views.collect { case v: PersistedView => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory it's possible for a user to define a standalone flow in their source code, but no table. Should we throw an exception in that case, or is a good exception already thrown elsewhere for that case?
Would be nice to write a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added tests here.
I think as long as there's no table defined, an exception will be thrown.
@@ -18,7 +18,7 @@ | |||
package org.apache.spark.sql.pipelines.utils | |||
|
|||
import org.apache.spark.sql.catalyst.TableIdentifier | |||
import org.apache.spark.sql.catalyst.analysis.{LocalTempView, UnresolvedRelation, ViewType} | |||
import org.apache.spark.sql.catalyst.analysis.{LocalTempView, PersistedView => PersistedViewType, UnresolvedRelation, ViewType} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's just import it as PersistedView
. I generally would recommend to only use alias imports when you need to import the same named entity from multiple different packages. Otherwise it adds another layer of abstraction when reading code.
Or if we think the name is bad, we should just do a rename of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this is because we are importing two PersistedView
from two different packages.
The second import is here
I renamed it to PersistedViewType
because I think this specific import point the types file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, didn't see that! Makes sense
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SqlPipelineSuite.scala
Outdated
Show resolved
Hide resolved
...ipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
Outdated
Show resolved
Hide resolved
@@ -4519,6 +4519,15 @@ | |||
], | |||
"sqlState" : "42S22" | |||
}, | |||
"NO_TABLES_IN_PIPELINE" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we rename this to NO_DATASET_IN_PIPELINE
, as persisted views are technically not tables but we're allowing a pipeline of just persisted views.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense
...nnect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Tagging @sryza for a second pass
@@ -50,6 +50,14 @@ class GraphRegistrationContext( | |||
} | |||
|
|||
def toDataflowGraph: DataflowGraph = { | |||
// throw exception if pipeline does not have table or persisted view |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super small nit: Let's just omit the comment, the exception message should be clear enough that it's self documenting.
This comment is prone to becoming stale anyway, when we add sinks for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one comment on the error code – otherwise looks good! After that's addressed, I will approve and merge.
Co-authored-by: Sandy Ryza <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
Merged to master |
What changes were proposed in this pull request?
When user runs a pipeline, throw a
RUN_EMPTY_PIPELINE
exception if the pipeline source directory does not contain any tables or persisted views.GraphRegistrationContext. toDataflowGraph
to throw the exception if the pipeline does not have any tables or persisted viewsWhy are the changes needed?
In Spark Declarative Pipelines, using the CLI tool users run a pipeline that is defined from the configured pipeline root directory. This directory contains information such as the pipeline spec, and the source code files (Python, SQL) that define the pipeline tables/flows/views.
It’s possible the user tries to run a pipeline defined from a pipeline directory whose source files don’t actually define any tables or views.
An exception should be thrown if the pipeline does not have any tables or views, to inform the user they should double check that they are running the pipeline in the correct directory. The previous behavior is that the pipeline just run to completion without emitting any info.
Does this PR introduce any user-facing change?
Yes, this is an additive non-breaking behavior change. However, SDP has not been released, so no user should be impacted by this change.
How was this patch tested?
Created additional test case to verify that the exception is indeed thrown.
Was this patch authored or co-authored using generative AI tooling?
No