Skip to content

Commit e17ff64

Browse files
szehon-hogengliangwang
authored andcommitted
[SPARK-52417][SQL] Simplify Table properties handling in View Schema Evolution Mode
### What changes were proposed in this pull request? When a View is created, ex CREATE VIEW v (a1 INT, a2 STRING) AS select c1, c2, it needs to save both set of columns (user-specified view schema and query output). 1. User-specified View schema are saved as View Schema (a1 INT, b2 STRING). 2. View query output is saved as Table property w/index (c1, 0) (c2, 1) In the new Schema Evolution mode, we never allow user-specified view schema, so view schema == view query output schema.  Every time we detect the output view schema changes, we sync the view's schema with view query schema, keeping the invariant. So we can simplify the logic for View in Schema Evolution mode to not update the Table Properties, and instead rely on the View Schema all the time. ### Why are the changes needed? View Schema Evolution is a useful mode. However, it requires a lot of permissions on the user querying the view, because that user needs to update the View definition. It will simplify auth if we do not have to update table properties too, and reduce the update to only the schema. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #51107 from szehon-ho/SPARK-524167. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent 722d02c commit e17ff64

File tree

4 files changed

+46
-18
lines changed

4 files changed

+46
-18
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -629,9 +629,15 @@ case class CatalogTable(
629629
if (lastAccessTime <= 0) JString("UNKNOWN")
630630
else JLong(lastAccessTime)
631631

632-
val viewQueryOutputColumns: JValue =
633-
if (viewQueryColumnNames.nonEmpty) JArray(viewQueryColumnNames.map(JString).toList)
634-
else JNull
632+
val viewQueryOutputColumns: JValue = {
633+
if (viewSchemaMode == SchemaEvolution) {
634+
JArray(schema.map(_.name).map(JString).toList)
635+
} else if (viewQueryColumnNames.nonEmpty) {
636+
JArray(viewQueryColumnNames.map(JString).toList)
637+
} else {
638+
JNull
639+
}
640+
}
635641

636642
val map = mutable.LinkedHashMap[String, JValue]()
637643

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,13 +586,21 @@ object ViewHelper extends SQLConfHelper with Logging {
586586
// names.
587587
SchemaUtils.checkColumnNameDuplication(fieldNames.toImmutableArraySeq, conf.resolver)
588588

589+
val queryColumnNameProps = if (viewSchemaMode == SchemaEvolution) {
590+
// If the view schema mode is SCHEMA EVOLUTION, we can avoid generating the query output
591+
// column names as table properties, and always use view schema as they are always same
592+
Seq()
593+
} else {
594+
generateQueryColumnNames(queryOutput.toImmutableArraySeq)
595+
}
596+
589597
// Generate the view default catalog and namespace, as well as captured SQL configs.
590598
val manager = session.sessionState.catalogManager
591599
removeReferredTempNames(removeSQLConfigs(removeQueryColumnNames(properties))) ++
592600
catalogAndNamespaceToProps(
593601
manager.currentCatalog.name, manager.currentNamespace.toImmutableArraySeq) ++
594602
sqlConfigsToProps(conf) ++
595-
generateQueryColumnNames(queryOutput.toImmutableArraySeq) ++
603+
queryColumnNameProps ++
596604
referredTempNamesToProps(tempViewNames, tempFunctionNames, tempVariableNames) ++
597605
viewSchemaModeToProps(viewSchemaMode)
598606
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import org.apache.spark.sql.classic.SparkSession
3535
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
3636
import org.apache.spark.sql.errors.QueryCompilationErrors
3737
import org.apache.spark.sql.execution.command.DDLUtils
38-
import org.apache.spark.sql.execution.command.ViewHelper.generateViewProperties
3938
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
4039
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
4140
import org.apache.spark.sql.internal.SQLConf
@@ -705,16 +704,6 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan => Unit) {
705704
}
706705

707706
if (redo) {
708-
val newProperties = if (viewSchemaMode == SchemaEvolution) {
709-
generateViewProperties(
710-
metaData.properties,
711-
session,
712-
fieldNames,
713-
fieldNames,
714-
metaData.viewSchemaMode)
715-
} else {
716-
metaData.properties
717-
}
718707
val newSchema = if (viewSchemaMode == SchemaTypeEvolution) {
719708
val newFields = viewQuery.schema.map {
720709
case StructField(name, dataType, nullable, _) =>
@@ -727,9 +716,7 @@ object ViewSyncSchemaToMetaStore extends (LogicalPlan => Unit) {
727716
}
728717
SchemaUtils.checkColumnNameDuplication(fieldNames.toImmutableArraySeq,
729718
session.sessionState.conf.resolver)
730-
val updatedViewMeta = metaData.copy(
731-
properties = newProperties,
732-
schema = newSchema)
719+
val updatedViewMeta = metaData.copy(schema = newSchema)
733720
session.sessionState.catalog.alterTable(updatedViewMeta)
734721
}
735722
case _ => // OK

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.jdk.CollectionConverters._
2222
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
2323
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
2424
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
25+
import org.apache.spark.sql.catalyst.catalog.CatalogTable._
2526
import org.apache.spark.sql.catalyst.expressions.Expression
2627
import org.apache.spark.sql.catalyst.plans.logical.Repartition
2728
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.withDefaultTimeZone
@@ -827,6 +828,32 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession {
827828
)
828829
}
829830

831+
test("SPARK-52417: Simplify Table properties handling in View Schema Evolution Mode") {
832+
withTable("t") {
833+
withView("v") {
834+
sql("CREATE TABLE t (c1 int)")
835+
sql("CREATE VIEW v WITH SCHEMA EVOLUTION AS SELECT * from t")
836+
sql("INSERT INTO t VALUES (1), (2), (3)")
837+
checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2), Row(3)))
838+
839+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("v"))
840+
assert(table.properties.get(VIEW_SCHEMA_MODE) === Some("EVOLUTION"))
841+
assert(!table.properties.exists(_._1.startsWith(VIEW_QUERY_OUTPUT_PREFIX)))
842+
assert(!table.properties.exists(_._1.startsWith(VIEW_QUERY_OUTPUT_NUM_COLUMNS)))
843+
844+
sql("DROP TABLE t")
845+
sql("CREATE TABLE t (s1 string, b1 boolean)")
846+
sql("INSERT INTO t VALUES ('a', true), ('b', false), ('c', true)")
847+
checkAnswer(sql("SELECT * FROM v"), Seq(Row("a", true), Row("b", false), Row("c", true)))
848+
849+
val updatedTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("v"))
850+
assert(updatedTable.properties.get(VIEW_SCHEMA_MODE) === Some("EVOLUTION"))
851+
assert(!updatedTable.properties.exists(_._1.startsWith(VIEW_QUERY_OUTPUT_PREFIX)))
852+
assert(!updatedTable.properties.exists(_._1.startsWith(VIEW_QUERY_OUTPUT_NUM_COLUMNS)))
853+
}
854+
}
855+
}
856+
830857
def getShowCreateDDL(view: String, serde: Boolean = false): String = {
831858
val result = if (serde) {
832859
sql(s"SHOW CREATE TABLE $view AS SERDE")

0 commit comments

Comments
 (0)