Skip to content

[SPARK-52759][SDP][SQL] Throw exception if pipeline has no tables or persisted views #51445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Jul 10, 2025

What changes were proposed in this pull request?

When user runs a pipeline, throw a RUN_EMPTY_PIPELINE exception if the pipeline source directory does not contain any tables or persisted views.

  • Add checks in GraphRegistrationContext. toDataflowGraph to throw the exception if the pipeline does not have any tables or persisted views
  • Modify test cases that currently register only temporary view to register persisted views instead since pipelines that only include temporary views are invalid now.
  • Add additional test cases to ensure the exception is thrown correctly.

Why are the changes needed?

In Spark Declarative Pipelines, using the CLI tool users run a pipeline that is defined from the configured pipeline root directory. This directory contains information such as the pipeline spec, and the source code files (Python, SQL) that define the pipeline tables/flows/views.

It’s possible the user tries to run a pipeline defined from a pipeline directory whose source files don’t actually define any tables or views.

An exception should be thrown if the pipeline does not have any tables or views, to inform the user they should double check that they are running the pipeline in the correct directory. The previous behavior is that the pipeline just run to completion without emitting any info.

Does this PR introduce any user-facing change?

Yes, this is an additive non-breaking behavior change. However, SDP has not been released, so no user should be impacted by this change.

How was this patch tested?

Created additional test case to verify that the exception is indeed thrown.

Was this patch authored or co-authored using generative AI tooling?

No

@JiaqiWang18 JiaqiWang18 changed the title [SPARK-52759] Throw exception if pipeline has no tables or persisted views [SPARK-52759][SDP][SQL] Throw exception if pipeline has no tables or persisted views Jul 10, 2025
@JiaqiWang18
Copy link
Contributor Author

@AnishMahto

Copy link
Contributor

@AnishMahto AnishMahto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's close! Just small comments, and let's also undo any formatting changes to existing code so that:

  1. The diff is smaller, making it easier for PR reviewers to understand what exactly is changing
  2. We don't pollute the git blame

If you feel like we should reformat some of the code, let's open a separate PR to do that!

@@ -50,16 +50,21 @@ class GraphRegistrationContext(
}

def toDataflowGraph: DataflowGraph = {
// throw exception if pipeline does not have table or persisted view
if (tables.isEmpty && views.collect { case v: PersistedView =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it's possible for a user to define a standalone flow in their source code, but no table. Should we throw an exception in that case, or is a good exception already thrown elsewhere for that case?

Would be nice to write a test for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests here.
I think as long as there's no table defined, an exception will be thrown.

@@ -18,7 +18,7 @@
package org.apache.spark.sql.pipelines.utils

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{LocalTempView, UnresolvedRelation, ViewType}
import org.apache.spark.sql.catalyst.analysis.{LocalTempView, PersistedView => PersistedViewType, UnresolvedRelation, ViewType}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's just import it as PersistedView. I generally would recommend to only use alias imports when you need to import the same named entity from multiple different packages. Otherwise it adds another layer of abstraction when reading code.

Or if we think the name is bad, we should just do a rename of this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is because we are importing two PersistedView from two different packages.
The second import is here
I renamed it to PersistedViewType because I think this specific import point the types file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, didn't see that! Makes sense

@@ -4519,6 +4519,15 @@
],
"sqlState" : "42S22"
},
"NO_TABLES_IN_PIPELINE" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we rename this to NO_DATASET_IN_PIPELINE, as persisted views are technically not tables but we're allowing a pipeline of just persisted views.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense

@JiaqiWang18 JiaqiWang18 requested a review from AnishMahto July 11, 2025 16:46
Copy link
Contributor

@AnishMahto AnishMahto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Tagging @sryza for a second pass

@@ -50,6 +50,14 @@ class GraphRegistrationContext(
}

def toDataflowGraph: DataflowGraph = {
// throw exception if pipeline does not have table or persisted view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super small nit: Let's just omit the comment, the exception message should be clear enough that it's self documenting.

This comment is prone to becoming stale anyway, when we add sinks for example.

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one comment on the error code – otherwise looks good! After that's addressed, I will approve and merge.

@JiaqiWang18 JiaqiWang18 requested a review from sryza July 14, 2025 17:22
Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@sryza sryza closed this in c4127d2 Jul 15, 2025
@sryza
Copy link
Contributor

sryza commented Jul 15, 2025

Merged to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants