Skip to content

Commit c4127d2

Browse files
jackywang-dbJiaqiWang18
authored andcommitted
[SPARK-52759][SDP][SQL] Throw exception if pipeline has no tables or persisted views
### What changes were proposed in this pull request? When user runs a pipeline, throw a `RUN_EMPTY_PIPELINE` exception if the pipeline source directory does not contain any tables or persisted views. * Add checks in `GraphRegistrationContext. toDataflowGraph` to throw the exception if the pipeline does not have any tables or persisted views * Modify test cases that currently register only temporary view to register persisted views instead since pipelines that only include temporary views are invalid now. * Add additional test cases to ensure the exception is thrown correctly. ### Why are the changes needed? In Spark Declarative Pipelines, using the CLI tool users run a pipeline that is defined from the configured pipeline root directory. This directory contains information such as the pipeline spec, and the source code files (Python, SQL) that define the pipeline tables/flows/views. It’s possible the user tries to run a pipeline defined from a pipeline directory whose source files don’t actually define any tables or views. An exception should be thrown if the pipeline does not have any tables or views, to inform the user they should double check that they are running the pipeline in the correct directory. The previous behavior is that the pipeline just run to completion without emitting any info. ### Does this PR introduce _any_ user-facing change? Yes, this is an additive non-breaking behavior change. However, SDP has not been released, so no user should be impacted by this change. ### How was this patch tested? Created additional test case to verify that the exception is indeed thrown. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51445 from JiaqiWang18/SPARK-52759-empty-pipeline-fails. Lead-authored-by: Jacky Wang <[email protected]> Co-authored-by: Jacky Wang <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent 417599d commit c4127d2

File tree

9 files changed

+297
-143
lines changed

9 files changed

+297
-143
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4930,6 +4930,13 @@
49304930
],
49314931
"sqlState" : "22023"
49324932
},
4933+
"RUN_EMPTY_PIPELINE" : {
4934+
"message" : [
4935+
"Pipelines are expected to have at least one non-temporary dataset defined (tables, persisted views) but no non-temporary datasets were found in your pipeline.",
4936+
"Please verify that you have included the expected source files, and that your source code includes table definitions (e.g., CREATE MATERIALIZED VIEW in SQL code, @sdp.table in python code)."
4937+
],
4938+
"sqlState" : "42617"
4939+
},
49334940
"SCALAR_FUNCTION_NOT_COMPATIBLE" : {
49344941
"message" : [
49354942
"ScalarFunction <scalarFunc> not overrides method 'produceResult(InternalRow)' with custom implementation."

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,12 @@ class PythonPipelineSuite
339339
TableIdentifier("st", Some("some_schema"), Some("some_catalog"))))
340340
}
341341

342-
test("view works") {
342+
test("temporary views works") {
343+
// A table is defined since pipeline with only temporary views is invalid.
343344
val graph = buildGraph(s"""
345+
|@sdp.table
346+
|def mv_1():
347+
| return spark.range(5)
344348
|@sdp.temporary_view
345349
|def view_1():
346350
| return spark.range(5)
@@ -354,9 +358,9 @@ class PythonPipelineSuite
354358
| return spark.read.table("view_1")
355359
|""".stripMargin).resolve()
356360
// views are temporary views, so they're not fully qualified.
357-
assert(graph.tables.isEmpty)
358361
assert(
359-
graph.flows.map(_.identifier.unquotedString).toSet == Set("view_1", "view_2", "view_3"))
362+
Set("view_1", "view_2", "view_3").subsetOf(
363+
graph.flows.map(_.identifier.unquotedString).toSet))
360364
// dependencies are correctly resolved view_2 reading from view_1
361365
assert(
362366
graph.resolvedFlow(TableIdentifier("view_2")).inputs.contains(TableIdentifier("view_1")))
@@ -416,6 +420,43 @@ class PythonPipelineSuite
416420
.map(_.identifier) == Seq(graphIdentifier("a"), graphIdentifier("something")))
417421
}
418422

423+
test("create pipeline without table will throw RUN_EMPTY_PIPELINE exception") {
424+
checkError(
425+
exception = intercept[AnalysisException] {
426+
buildGraph(s"""
427+
|spark.range(1)
428+
|""".stripMargin)
429+
},
430+
condition = "RUN_EMPTY_PIPELINE",
431+
parameters = Map.empty)
432+
}
433+
434+
test("create pipeline with only temp view will throw RUN_EMPTY_PIPELINE exception") {
435+
checkError(
436+
exception = intercept[AnalysisException] {
437+
buildGraph(s"""
438+
|@sdp.temporary_view
439+
|def view_1():
440+
| return spark.range(5)
441+
|""".stripMargin)
442+
},
443+
condition = "RUN_EMPTY_PIPELINE",
444+
parameters = Map.empty)
445+
}
446+
447+
test("create pipeline with only flow will throw RUN_EMPTY_PIPELINE exception") {
448+
checkError(
449+
exception = intercept[AnalysisException] {
450+
buildGraph(s"""
451+
|@sdp.append_flow(target = "a")
452+
|def flow():
453+
| return spark.range(5)
454+
|""".stripMargin)
455+
},
456+
condition = "RUN_EMPTY_PIPELINE",
457+
parameters = Map.empty)
458+
}
459+
419460
/**
420461
* Executes Python code in a separate process and returns the exit code.
421462
*

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ class GraphRegistrationContext(
5050
}
5151

5252
def toDataflowGraph: DataflowGraph = {
53+
if (tables.isEmpty && views.collect { case v: PersistedView =>
54+
v
55+
}.isEmpty) {
56+
throw new AnalysisException(
57+
errorClass = "RUN_EMPTY_PIPELINE",
58+
messageParameters = Map.empty)
59+
}
5360
val qualifiedTables = tables.toSeq.map { t =>
5461
t.copy(
5562
identifier = GraphIdentifierManager

0 commit comments

Comments
 (0)