Skip to content

[SDP] Validate streaming-ness of DFs returned by SDP table and standalone flow definitions #51208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2719,6 +2719,33 @@
],
"sqlState" : "42000"
},
"INVALID_FLOW_RELATION_TYPE" : {
"message" : [
"Flow <flowIdentifier> returns an invalid relation type."
],
"subClass" : {
"FOR_MATERIALIZED_VIEW" : {
"message" : [
"Materialized views may only be defined by a batch relation, but the flow <flowIdentifier> attempts to write a streaming relation to the materialized view <tableIdentifier>."
]
},
"FOR_ONCE_FLOW" : {
"message" : [
"<flowIdentifier> is an append once-flow that is defined by a streaming relation. Append once-flows may only be defined by or return a batch relation."
]
},
"FOR_PERSISTED_VIEW" : {
"message" : [
"Persisted views may only be defined by a batch relation, but the flow <flowIdentifier> attempts to write a streaming relation to the persisted view <viewIdentifier>."
]
},
"FOR_STREAMING_TABLE" : {
"message" : [
"Streaming tables may only be defined by streaming relations, but the flow <flowIdentifier> attempts to write a batch relation to the streaming table <tableIdentifier>. Consider using the STREAM operator in Spark-SQL to convert the batch relation into a streaming relation, or populating the streaming table with an append once-flow instead."
]
}
}
},
"INVALID_FORMAT" : {
"message" : [
"The format is invalid: <format>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private[connect] object PipelinesHandler extends Logging {
language = Option(Python())),
format = Option.when(dataset.hasFormat)(dataset.getFormat),
normalizedPath = None,
isStreamingTableOpt = None))
isStreamingTable = dataset.getDatasetType == proto.DatasetType.TABLE))
case proto.DatasetType.TEMPORARY_VIEW =>
val viewIdentifier =
GraphIdentifierManager.parseTableIdentifier(dataset.getDatasetName, sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ class PythonPipelineSuite
}

test("basic") {
val graph = buildGraph("""
val graph = buildGraph(
"""
|@sdp.table
|def table1():
| return spark.range(10)
| return spark.readStream.format("rate").load()
|""".stripMargin)
.resolve()
.validate()
Expand All @@ -112,11 +113,11 @@ class PythonPipelineSuite
|def c():
| return spark.readStream.table("a")
|
|@sdp.table()
|@sdp.materialized_view()
|def d():
| return spark.read.table("a")
|
|@sdp.table()
|@sdp.materialized_view()
|def a():
| return spark.range(5)
|""".stripMargin)
Expand Down Expand Up @@ -177,11 +178,11 @@ class PythonPipelineSuite
test("referencing external datasets") {
sql("CREATE TABLE spark_catalog.default.src AS SELECT * FROM RANGE(5)")
val graph = buildGraph("""
|@sdp.table
|@sdp.materialized_view
|def a():
| return spark.read.table("spark_catalog.default.src")
|
|@sdp.table
|@sdp.materialized_view
|def b():
| return spark.table("spark_catalog.default.src")
|
Expand Down Expand Up @@ -230,11 +231,11 @@ class PythonPipelineSuite
|def a():
| return spark.read.table("spark_catalog.default.src")
|
|@sdp.table
|@sdp.materialized_view
|def b():
| return spark.table("spark_catalog.default.src")
|
|@sdp.table
|@sdp.materialized_view
|def c():
| return spark.readStream.table("spark_catalog.default.src")
|""".stripMargin).resolve()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class SparkDeclarativePipelinesServerSuite
sql = Some("SELECT * FROM STREAM tableA"))
createTable(
name = "tableC",
datasetType = DatasetType.TABLE,
datasetType = DatasetType.MATERIALIZED_VIEW,
sql = Some("SELECT * FROM tableB"))
}

Expand Down Expand Up @@ -238,7 +238,7 @@ class SparkDeclarativePipelinesServerSuite
createView(name = "viewC", sql = "SELECT * FROM curr.tableB")
createTable(
name = "other.tableD",
datasetType = proto.DatasetType.TABLE,
datasetType = proto.DatasetType.MATERIALIZED_VIEW,
sql = Some("SELECT * FROM viewC"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ class CoreDataflowNodeProcessor(rawGraph: DataflowGraph) {
val resolvedFlowsToTable = flowsToTable.map { flow =>
resolvedFlowNodesMap.get(flow.identifier)
}

// Assign isStreamingTable (MV or ST) to the table based on the resolvedFlowsToTable
val tableWithType = table.copy(
isStreamingTableOpt = Option(resolvedFlowsToTable.exists(f => f.df.isStreaming))
)

// We mark all tables as virtual to ensure resolution uses incoming flows
// rather than previously materialized tables.
val virtualTableInput = VirtualTableInput(
Expand All @@ -95,7 +89,7 @@ class CoreDataflowNodeProcessor(rawGraph: DataflowGraph) {
availableFlows = resolvedFlowsToTable
)
resolvedInputs.put(table.identifier, virtualTableInput)
Seq(tableWithType)
Seq(table)
case view: View =>
// For view, add the flow to resolvedInputs and return empty.
require(upstreamNodes.size == 1, "Found multiple flows to view")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ case class DataflowGraph(flows: Seq[Flow], tables: Seq[Table], views: Seq[View])
validatePersistedViewSources()
validateEveryDatasetHasFlow()
validateTablesAreResettable()
validateFlowStreamingness()
inferredSchema
}.failed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,15 @@ object DatasetManager extends Logging {
}

// Wipe the data if we need to
if ((isFullRefresh || !table.isStreamingTableOpt.get) && existingTableOpt.isDefined) {
if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) {
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
}

// Alter the table if we need to
if (existingTableOpt.isDefined) {
val existingSchema = existingTableOpt.get.schema()

val targetSchema = if (table.isStreamingTableOpt.get && !isFullRefresh) {
val targetSchema = if (table.isStreamingTable && !isFullRefresh) {
SchemaMergingUtils.mergeSchemas(existingSchema, outputSchema)
} else {
outputSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,75 @@ trait GraphValidations extends Logging {
multiQueryTables
}

protected[graph] def validateFlowStreamingness(): Unit = {
flowsTo.foreach { case (destTableIdentifier, flows) =>
val destTableOpt = table.get(destTableIdentifier)

// If the destination identifier does not correspond to a table, it must be a view.
val destViewOpt = destTableOpt.fold(view.get(destTableIdentifier))(_ => None)

flows.foreach {
case resolvedFlow: ResolvedFlow =>
// A flow must be successfully analyzed, thus resolved, in order to determine if it is
// streaming or not. Unresolved flows will throw an exception anyway via
// [[validateSuccessfulFlowAnalysis]], so don't check them here.
if (resolvedFlow.once) {
// Once flows by definition should be batch flows, not streaming.
if (resolvedFlow.df.isStreaming) {
throw new AnalysisException(
errorClass = "INVALID_FLOW_RELATION_TYPE.FOR_ONCE_FLOW",
messageParameters = Map(
"flowIdentifier" -> resolvedFlow.identifier.quotedString
)
)
}
} else {
destTableOpt.foreach { destTable =>
if (destTable.isStreamingTable) {
if (!resolvedFlow.df.isStreaming) {
throw new AnalysisException(
errorClass = "INVALID_FLOW_RELATION_TYPE.FOR_STREAMING_TABLE",
messageParameters = Map(
"flowIdentifier" -> resolvedFlow.identifier.quotedString,
"tableIdentifier" -> destTableIdentifier.quotedString
)
)
}
} else {
if (resolvedFlow.df.isStreaming) {
// This check intentionally does NOT prevent materialized views from reading from
// a streaming table using a _batch_ read, which is still considered valid.
throw new AnalysisException(
errorClass = "INVALID_FLOW_RELATION_TYPE.FOR_MATERIALIZED_VIEW",
messageParameters = Map(
"flowIdentifier" -> resolvedFlow.identifier.quotedString,
"tableIdentifier" -> destTableIdentifier.quotedString
)
)
}
}
}

destViewOpt.foreach {
case _: PersistedView =>
if (resolvedFlow.df.isStreaming) {
throw new AnalysisException(
errorClass = "INVALID_FLOW_RELATION_TYPE.FOR_PERSISTED_VIEW",
messageParameters = Map(
"flowIdentifier" -> resolvedFlow.identifier.quotedString,
"viewIdentifier" -> destTableIdentifier.quotedString
)
)
}
case _: TemporaryView =>
// Temporary views' flows are allowed to be either streaming or batch, so no
// validation needs to be done for them
}
}
}
}
}

/** Throws an exception if the flows in this graph are not topologically sorted. */
protected[graph] def validateGraphIsTopologicallySorted(): Unit = {
val visitedNodes = mutable.Set.empty[TableIdentifier] // Set of visited nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class SqlGraphRegistrationContext(
),
format = cst.tableSpec.provider,
normalizedPath = None,
isStreamingTableOpt = None
isStreamingTable = true
)
)
}
Expand Down Expand Up @@ -230,7 +230,7 @@ class SqlGraphRegistrationContext(
),
format = cst.tableSpec.provider,
normalizedPath = None,
isStreamingTableOpt = None
isStreamingTable = true
)
)

Expand Down Expand Up @@ -281,7 +281,7 @@ class SqlGraphRegistrationContext(
),
format = cmv.tableSpec.provider,
normalizedPath = None,
isStreamingTableOpt = None
isStreamingTable = false
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ sealed trait TableInput extends Input {
* path (if not defined, we will normalize a managed storage path for it).
* @param properties Table Properties to set in table metadata.
* @param comment User-specified comment that can be placed on the table.
* @param isStreamingTableOpt if the table is a streaming table, will be None until we have resolved
* flows into table
* @param isStreamingTable if the table is a streaming table, as defined by the source code.
*/
case class Table(
identifier: TableIdentifier,
Expand All @@ -125,7 +124,7 @@ case class Table(
properties: Map[String, String] = Map.empty,
comment: Option[String],
baseOrigin: QueryOrigin,
isStreamingTableOpt: Option[Boolean],
isStreamingTable: Boolean,
format: Option[String]
) extends TableInput
with Output {
Expand Down Expand Up @@ -163,17 +162,6 @@ case class Table(
normalizedPath.get
}

/**
* Tell if a table is a streaming table or not. This property is not set until we have resolved
* the flows into the table. The exception reminds engineers that they cant call at random time.
*/
def isStreamingTable: Boolean = isStreamingTableOpt.getOrElse {
throw new IllegalStateException(
"Cannot identify whether the table is streaming table or not. You may need to resolve the " +
"flows into table."
)
}

/**
* Get the DatasetType of the table
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,77 @@ class ConnectInvalidPipelineSuite extends PipelineTest {
)
}

test("Streaming table backed by batch relation fails validation") {
val session = spark
import session.implicits._

val graph = new TestGraphRegistrationContext(spark) {
registerTable("a", query = Option(dfFlowFunc(Seq(1, 2).toDF())))
}.resolveToDataflowGraph()

val ex = intercept[AnalysisException] {
graph.validate()
}

checkError(
exception = ex,
condition = "INVALID_FLOW_RELATION_TYPE.FOR_STREAMING_TABLE",
parameters = Map(
"flowIdentifier" -> fullyQualifiedIdentifier("a").quotedString,
"tableIdentifier" -> fullyQualifiedIdentifier("a").quotedString
)
)
}

test("Materialized view backed by streaming relation fails validation") {
val session = spark
import session.implicits._

val graph = new TestGraphRegistrationContext(spark) {
registerMaterializedView("a", query = dfFlowFunc(MemoryStream[Int].toDF()))
}.resolveToDataflowGraph()

val ex = intercept[AnalysisException] {
graph.validate()
}

checkError(
exception = ex,
condition = "INVALID_FLOW_RELATION_TYPE.FOR_MATERIALIZED_VIEW",
parameters = Map(
"flowIdentifier" -> fullyQualifiedIdentifier("a").quotedString,
"tableIdentifier" -> fullyQualifiedIdentifier("a").quotedString
)
)
}

test("Once flow backed by streaming relation fails validation") {
val session = spark
import session.implicits._

val graph = new TestGraphRegistrationContext(spark) {
registerTable("a")
registerFlow(
destinationName = "a",
name = "once_flow",
query = dfFlowFunc(MemoryStream[Int].toDF()),
once = true
)
}.resolveToDataflowGraph()

val ex = intercept[AnalysisException] {
graph.validate()
}

checkError(
exception = ex,
condition = "INVALID_FLOW_RELATION_TYPE.FOR_ONCE_FLOW",
parameters = Map(
"flowIdentifier" -> fullyQualifiedIdentifier("once_flow").quotedString
)
)
}

test("Inferred schema that isn't a subset of user-specified schema") {
val session = spark
import session.implicits._
Expand Down
Loading