From e0d943a21d5de7870538e823e08933438489c5ea Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 23 May 2025 11:04:57 -0700 Subject: [PATCH 1/7] skip check constraint on delete --- .../catalyst/analysis/ResolveTableConstraints.scala | 6 +++++- .../sql/connector/DeleteFromTableSuiteBase.scala | 11 ++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala index 3b86b9580ae19..0b7cfb0591197 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala @@ -19,17 +19,21 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, RowLevelWrite, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.constraints.Check +import org.apache.spark.sql.connector.write.RowLevelOperation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation class ResolveTableConstraints(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(COMMAND), ruleId) { + case r: RowLevelWrite if r.operation.command() == RowLevelOperation.Command.DELETE => + r + case v2Write: V2WriteCommand if v2Write.table.resolved && v2Write.query.resolved && !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala index 33c17c648046a..2dc4098e4f3e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.CheckInvariant +import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, ReplaceDataExec, WriteDeltaExec} abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { @@ -78,7 +80,14 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |{ "pk": 2, "id": 4, "dep": "eng" } |{ "pk": 3, "id": 6, "dep": "eng" } |""".stripMargin) - sql(s"DELETE FROM $tableNameAsString WHERE pk < 2") + val df = sql(s"DELETE FROM $tableNameAsString WHERE pk < 2") + val checkInvariant = df.queryExecution.analyzed.collectFirst { + case f: Filter => + f.condition.collectFirst { + case c: CheckInvariant => c + } + }.flatten + assert(checkInvariant.isEmpty, "Check invariant should not be present in the delete plan") checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(2, 4, "eng"), Row(3, 6, "eng"))) From 959ea4365095010c3870b6b5e0eae40b52a759c1 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 23 May 2025 13:11:06 -0700 Subject: [PATCH 2/7] add one case --- .../command/v2/CheckConstraintSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala index a734f8507dac8..9d6939642e8cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.catalog.constraints.Check import org.apache.spark.sql.execution.command.DDLCommandTestUtils @@ -908,4 +909,23 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma } } } + + test("Check constraint with constant expression should be optimized out") { + Seq( + "1 > 0", + "null", + "current_date() > DATE'2023-01-01'" + ).foreach { constant => + withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => + sql(s"CREATE TABLE $t (id INT, value INT," + + s" CONSTRAINT positive_id CHECK ($constant)) $defaultUsing") + val optimizedPlan = + sql(s"INSERT INTO $t VALUES (1, 10), (2, 20)").queryExecution.optimizedPlan + val filter = optimizedPlan.collectFirst { + case f: Filter => f + } + assert(filter.isEmpty) + } + } + } } From 5f3be91b7b9c7e5820414deb97c4066b6d1d4b3d Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 2 Jun 2025 10:03:57 -0700 Subject: [PATCH 3/7] save for now --- .../expressions/filter/AlwaysNull.java | 32 +++++++++++++++++++ .../expressions/V2ExpressionUtils.scala | 6 ++-- .../sql/catalyst/expressions/literals.scala | 2 ++ 3 files changed, 37 insertions(+), 3 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java new file mode 100644 index 0000000000000..783a96b7994eb --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java @@ -0,0 +1,32 @@ +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; + +/** + * A predicate that always evaluates to {@code null}. + * + * @since 3.3.0 + */ +@Evolving +public final class AlwaysNull extends Predicate implements Literal { + + public AlwaysNull() { + super("ALWAYS_NULL", new Predicate[]{}); + } + + @Override + public Boolean value() { + return null; + } + + @Override + public DataType dataType() { + return DataTypes.BooleanType; + } + + @Override + public String toString() { return "NULL"; } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index 7cc03f3ac3fa6..f6d38de981b5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.expressions import java.lang.reflect.{Method, Modifier} - import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{FUNCTION_NAME, FUNCTION_PARAM} import org.apache.spark.sql.AnalysisException @@ -30,8 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME -import org.apache.spark.sql.connector.expressions.{BucketTransform, Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, IdentityTransform, Literal => V2Literal, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform} -import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue} +import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, GeneralScalarExpression, IdentityTransform, NamedReference, NamedTransform, SortValue, Transform, Cast => V2Cast, Expression => V2Expression, Literal => V2Literal, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder} +import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysNull, AlwaysTrue} import org.apache.spark.sql.errors.DataTypeErrors.toSQLId import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ @@ -210,6 +209,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { def toCatalyst(expr: V2Expression): Option[Expression] = expr match { case _: AlwaysTrue => Some(Literal.TrueLiteral) case _: AlwaysFalse => Some(Literal.FalseLiteral) + case _: AlwaysNull => Some(Literal.NullPredicateLiteral) case l: V2Literal[_] => Some(Literal(l.value, l.dataType)) case r: NamedReference => Some(UnresolvedAttribute(r.fieldNames.toImmutableArraySeq)) case c: V2Cast => toCatalyst(c.expression).map(Cast(_, c.dataType, ansiEnabled = true)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index e3ed2c4a0b0b8..a90f47491b60f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -65,6 +65,8 @@ object Literal { val FalseLiteral: Literal = Literal(false, BooleanType) + val NullPredicateLiteral: Literal = Literal(null, NullType) + def apply(v: Any): Literal = v match { case i: Int => Literal(i, IntegerType) case l: Long => Literal(l, LongType) From f8dbe3ba1b9225ef990173bbdbed95373e5dce8f Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Tue, 10 Jun 2025 13:56:23 -0700 Subject: [PATCH 4/7] handle null --- .../expressions/filter/AlwaysNull.java | 2 +- .../expressions/V2ExpressionUtils.scala | 3 ++- .../catalyst/expressions/constraints.scala | 7 +++-- .../sql/catalyst/expressions/literals.scala | 2 +- .../catalyst/util/V2ExpressionBuilder.scala | 5 ++-- .../command/v2/CheckConstraintSuite.scala | 26 ++++++++++++++++++- 6 files changed, 37 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java index 783a96b7994eb..101c8254cc0d5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java @@ -8,7 +8,7 @@ /** * A predicate that always evaluates to {@code null}. * - * @since 3.3.0 + * @since 4.1.0 */ @Evolving public final class AlwaysNull extends Predicate implements Literal { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index f6d38de981b5e..bf25eaa716830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.lang.reflect.{Method, Modifier} + import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{FUNCTION_NAME, FUNCTION_PARAM} import org.apache.spark.sql.AnalysisException @@ -29,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME -import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, GeneralScalarExpression, IdentityTransform, NamedReference, NamedTransform, SortValue, Transform, Cast => V2Cast, Expression => V2Expression, Literal => V2Literal, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder} +import org.apache.spark.sql.connector.expressions.{BucketTransform, Cast => V2Cast, Expression => V2Expression, FieldReference, GeneralScalarExpression, IdentityTransform, Literal => V2Literal, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform} import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysNull, AlwaysTrue} import org.apache.spark.sql.errors.DataTypeErrors.toSQLId import org.apache.spark.sql.errors.QueryCompilationErrors diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala index a27460e2be1cd..1a7e3b03c0e6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.util.V2ExpressionBuilder import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.expressions.FieldReference import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.types.{BooleanType, DataType} +import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType} trait TableConstraint extends Expression with Unevaluable { /** Convert to a data source v2 constraint */ @@ -122,9 +122,12 @@ case class CheckConstraint( override val tableName: String = null, override val userProvidedCharacteristic: ConstraintCharacteristic = ConstraintCharacteristic.empty) extends UnaryExpression - with TableConstraint { + with TableConstraint + with ImplicitCastInputTypes { // scalastyle:on line.size.limit + override def inputTypes: Seq[AbstractDataType] = Seq(BooleanType) + def toV2Constraint: Constraint = { val predicate = new V2ExpressionBuilder(child, true).buildPredicate().orNull val enforced = userProvidedCharacteristic.enforced.getOrElse(true) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index a90f47491b60f..f0c3d74edf443 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -65,7 +65,7 @@ object Literal { val FalseLiteral: Literal = Literal(false, BooleanType) - val NullPredicateLiteral: Literal = Literal(null, NullType) + val NullPredicateLiteral: Literal = Literal(null, BooleanType) def apply(v: Any): Literal = v match { case i: Int => Literal(i, IntegerType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index fad73a6d81464..e3ebf1822041e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.connector.catalog.functions.ScalarFunction import org.apache.spark.sql.connector.expressions.{Cast => V2Cast, Expression => V2Expression, Extract => V2Extract, FieldReference, GeneralScalarExpression, LiteralValue, NullOrdering, SortDirection, SortValue, UserDefinedScalarFunc} import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Avg, Count, CountStar, GeneralAggregateFunc, Max, Min, Sum, UserDefinedAggregateFunc} -import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate} +import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysNull, AlwaysTrue, And => V2And, Not => V2Not, Or => V2Or, Predicate => V2Predicate} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, StringType} +import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType, NullType, StringType} /** * The builder to generate V2 expressions from catalyst expressions. @@ -78,6 +78,7 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) extends L expr: Expression, isPredicate: Boolean = false): Option[V2Expression] = expr match { case Literal(true, BooleanType) => Some(new AlwaysTrue()) case Literal(false, BooleanType) => Some(new AlwaysFalse()) + case Cast(Literal(null, NullType), BooleanType, _, _) if isPredicate => Some(new AlwaysNull()) case Literal(value, dataType) => Some(LiteralValue(value, dataType)) case col @ ColumnOrField(nameParts) => val ref = FieldReference(nameParts) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala index 9d6939642e8cd..90f92aa06e752 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala @@ -910,9 +910,10 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma } } - test("Check constraint with constant expression should be optimized out") { + test("Check constraint with constant valid expression should be optimized out") { Seq( "1 > 0", + "abs(-99) < 100", "null", "current_date() > DATE'2023-01-01'" ).foreach { constant => @@ -928,4 +929,27 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma } } } + + test("Check constraint with constant invalid expression should throw error") { + Seq( + "1 < 0", + "abs(-99) > 100", + "current_date() < DATE'2023-01-01'" + ).foreach { constant => + withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => + sql(s"CREATE TABLE $t (id INT, value INT," + + s" CONSTRAINT positive_id CHECK ($constant)) $defaultUsing") + val error = intercept[SparkRuntimeException] { + sql(s"INSERT INTO $t VALUES (1, 10), (2, 20)") + } + checkError( + exception = error, + condition = "CHECK_CONSTRAINT_VIOLATION", + sqlState = "23001", + parameters = Map("constraintName" -> "positive_id", "expression" -> constant, + "values" -> "") + ) + } + } + } } From bc388fdb692fecc7165f36906dc0fda70a6984a8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 18 Jun 2025 16:50:02 -0700 Subject: [PATCH 5/7] add header --- .../expressions/filter/AlwaysNull.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java index 101c8254cc0d5..3825672cd78a3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.connector.expressions.filter; import org.apache.spark.annotation.Evolving; From 766916dcdbf9a25f03305832c0eeffb0ba7682c4 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 23 Jun 2025 13:55:48 -0700 Subject: [PATCH 6/7] revert delete related code: --- .../catalyst/analysis/ResolveTableConstraints.scala | 6 +----- .../sql/connector/DeleteFromTableSuiteBase.scala | 11 +---------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala index 0b7cfb0591197..3b86b9580ae19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableConstraints.scala @@ -19,21 +19,17 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.expressions.{And, CheckInvariant, Expression, V2ExpressionUtils} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, RowLevelWrite, V2WriteCommand} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, V2WriteCommand} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.constraints.Check -import org.apache.spark.sql.connector.write.RowLevelOperation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation class ResolveTableConstraints(val catalogManager: CatalogManager) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( _.containsPattern(COMMAND), ruleId) { - case r: RowLevelWrite if r.operation.command() == RowLevelOperation.Command.DELETE => - r - case v2Write: V2WriteCommand if v2Write.table.resolved && v2Write.query.resolved && !containsCheckInvariant(v2Write.query) && v2Write.outputResolved => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala index 2dc4098e4f3e4..33c17c648046a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.CheckInvariant -import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, ReplaceDataExec, WriteDeltaExec} abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { @@ -80,14 +78,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |{ "pk": 2, "id": 4, "dep": "eng" } |{ "pk": 3, "id": 6, "dep": "eng" } |""".stripMargin) - val df = sql(s"DELETE FROM $tableNameAsString WHERE pk < 2") - val checkInvariant = df.queryExecution.analyzed.collectFirst { - case f: Filter => - f.condition.collectFirst { - case c: CheckInvariant => c - } - }.flatten - assert(checkInvariant.isEmpty, "Check invariant should not be present in the delete plan") + sql(s"DELETE FROM $tableNameAsString WHERE pk < 2") checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(2, 4, "eng"), Row(3, 6, "eng"))) From 809859bf1f772700cfb9b5e108683707b1dd734a Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 23 Jun 2025 13:57:49 -0700 Subject: [PATCH 7/7] fix indent --- .../spark/sql/connector/expressions/filter/AlwaysNull.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java index 3825672cd78a3..6abd036cb8952 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysNull.java @@ -31,17 +31,17 @@ public final class AlwaysNull extends Predicate implements Literal { public AlwaysNull() { - super("ALWAYS_NULL", new Predicate[]{}); + super("ALWAYS_NULL", new Predicate[]{}); } @Override public Boolean value() { - return null; + return null; } @Override public DataType dataType() { - return DataTypes.BooleanType; + return DataTypes.BooleanType; } @Override