Skip to content

[SPARK-52759][SDP][SQL] Throw exception if pipeline has no tables or persisted views #51445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4930,6 +4930,13 @@
],
"sqlState" : "22023"
},
"RUN_EMPTY_PIPELINE" : {
"message" : [
"Pipelines are expected to have at least one non-temporary dataset defined (tables, persisted views) but no non-temporary datasets were found in your pipeline.",
"Please verify that you have included the expected source files, and that your source code includes table definitions (e.g., CREATE MATERIALIZED VIEW in SQL code, @sdp.table in python code)."
],
"sqlState" : "42617"
},
"SCALAR_FUNCTION_NOT_COMPATIBLE" : {
"message" : [
"ScalarFunction <scalarFunc> not overrides method 'produceResult(InternalRow)' with custom implementation."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,12 @@ class PythonPipelineSuite
TableIdentifier("st", Some("some_schema"), Some("some_catalog"))))
}

test("view works") {
test("temporary views works") {
// A table is defined since pipeline with only temporary views is invalid.
val graph = buildGraph(s"""
|@sdp.table
|def mv_1():
| return spark.range(5)
|@sdp.temporary_view
|def view_1():
| return spark.range(5)
Expand All @@ -354,9 +358,9 @@ class PythonPipelineSuite
| return spark.read.table("view_1")
|""".stripMargin).resolve()
// views are temporary views, so they're not fully qualified.
assert(graph.tables.isEmpty)
assert(
graph.flows.map(_.identifier.unquotedString).toSet == Set("view_1", "view_2", "view_3"))
Set("view_1", "view_2", "view_3").subsetOf(
graph.flows.map(_.identifier.unquotedString).toSet))
// dependencies are correctly resolved view_2 reading from view_1
assert(
graph.resolvedFlow(TableIdentifier("view_2")).inputs.contains(TableIdentifier("view_1")))
Expand Down Expand Up @@ -416,6 +420,43 @@ class PythonPipelineSuite
.map(_.identifier) == Seq(graphIdentifier("a"), graphIdentifier("something")))
}

test("create pipeline without table will throw RUN_EMPTY_PIPELINE exception") {
checkError(
exception = intercept[AnalysisException] {
buildGraph(s"""
|spark.range(1)
|""".stripMargin)
},
condition = "RUN_EMPTY_PIPELINE",
parameters = Map.empty)
}

test("create pipeline with only temp view will throw RUN_EMPTY_PIPELINE exception") {
checkError(
exception = intercept[AnalysisException] {
buildGraph(s"""
|@sdp.temporary_view
|def view_1():
| return spark.range(5)
|""".stripMargin)
},
condition = "RUN_EMPTY_PIPELINE",
parameters = Map.empty)
}

test("create pipeline with only flow will throw RUN_EMPTY_PIPELINE exception") {
checkError(
exception = intercept[AnalysisException] {
buildGraph(s"""
|@sdp.append_flow(target = "a")
|def flow():
| return spark.range(5)
|""".stripMargin)
},
condition = "RUN_EMPTY_PIPELINE",
parameters = Map.empty)
}

/**
* Executes Python code in a separate process and returns the exit code.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ class GraphRegistrationContext(
}

def toDataflowGraph: DataflowGraph = {
if (tables.isEmpty && views.collect { case v: PersistedView =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory it's possible for a user to define a standalone flow in their source code, but no table. Should we throw an exception in that case, or is a good exception already thrown elsewhere for that case?

Would be nice to write a test for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added tests here.
I think as long as there's no table defined, an exception will be thrown.

v
}.isEmpty) {
throw new AnalysisException(
errorClass = "RUN_EMPTY_PIPELINE",
messageParameters = Map.empty)
}
val qualifiedTables = tables.toSeq.map { t =>
t.copy(
identifier = GraphIdentifierManager
Expand Down
Loading