Skip to content

Commit c4fb4a4

Browse files
yuexingLuciferYang
authored andcommitted
[SPARK-42322][SQL] Assign name to_LEGACY_ERROR_TEMP_2235
### What changes were proposed in this pull request? see https://issues.apache.org/jira/browse/SPARK-42322. assign meaningful name to LEGACY_ERROR_TEMP_2235, also make the error msg more helpful. ### Why are the changes needed? see https://issues.apache.org/jira/browse/SPARK-42322. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #51125 from yuexing/SPARK-42322-stage-materialization-error. Lead-authored-by: xingyue <[email protected]> Co-authored-by: Yue <[email protected]> Signed-off-by: yangjie01 <[email protected]>
1 parent abecd4a commit c4fb4a4

File tree

4 files changed

+73
-10
lines changed

4 files changed

+73
-10
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4978,6 +4978,12 @@
49784978
],
49794979
"sqlState" : "07501"
49804980
},
4981+
"STAGE_MATERIALIZATION_MULTIPLE_FAILURES" : {
4982+
"message" : [
4983+
"Multiple failures (<failureCount>) in stage materialization: <failureDetails>"
4984+
],
4985+
"sqlState" : "XX000"
4986+
},
49814987
"STAR_GROUP_BY_POS" : {
49824988
"message" : [
49834989
"Star (*) is not allowed in a select list when GROUP BY an ordinal position is used."
@@ -8565,11 +8571,6 @@
85658571
"Failed to set original ACL <aclEntries> back to the created path: <path>. Exception: <message>"
85668572
]
85678573
},
8568-
"_LEGACY_ERROR_TEMP_2235" : {
8569-
"message" : [
8570-
"Multiple failures in stage materialization."
8571-
]
8572-
},
85738574
"_LEGACY_ERROR_TEMP_2236" : {
85748575
"message" : [
85758576
"Unrecognized compression scheme type ID: <typeId>."

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1970,11 +1970,18 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
19701970
"message" -> e.getMessage))
19711971
}
19721972

1973-
def multiFailuresInStageMaterializationError(error: Throwable): Throwable = {
1973+
def multiFailuresInStageMaterializationError(errors: Seq[Throwable]): Throwable = {
1974+
val failureDetails = errors.zipWithIndex.map { case (error, index) =>
1975+
s"\n ${index + 1}. ${error.getClass.getSimpleName}: ${error.getMessage}"
1976+
}.mkString("")
1977+
19741978
new SparkException(
1975-
errorClass = "_LEGACY_ERROR_TEMP_2235",
1976-
messageParameters = Map.empty,
1977-
cause = error)
1979+
errorClass = "STAGE_MATERIALIZATION_MULTIPLE_FAILURES",
1980+
messageParameters = Map(
1981+
"failureCount" -> errors.size.toString,
1982+
"failureDetails" -> failureDetails
1983+
),
1984+
cause = errors.head)
19781985
}
19791986

19801987
def unrecognizedCompressionSchemaTypeIDError(typeId: Int): SparkUnsupportedOperationException = {

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,7 @@ case class AdaptiveSparkPlanExec(
897897
val e = if (originalErrors.size == 1) {
898898
originalErrors.head
899899
} else {
900-
val se = QueryExecutionErrors.multiFailuresInStageMaterializationError(originalErrors.head)
900+
val se = QueryExecutionErrors.multiFailuresInStageMaterializationError(originalErrors)
901901
originalErrors.tail.foreach(se.addSuppressed)
902902
se
903903
}

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3136,6 +3136,61 @@ class AdaptiveQueryExecSuite
31363136
}
31373137
}
31383138
}
3139+
3140+
test("SPARK-42322: STAGE_MATERIALIZATION_MULTIPLE_FAILURES error class validation") {
3141+
withSQLConf(
3142+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
3143+
3144+
withTempView("test_table1", "test_table2") {
3145+
import java.lang.reflect.InvocationTargetException
3146+
3147+
// Create datasets
3148+
spark.range(100).selectExpr("id", "id % 10 as group_col")
3149+
.createOrReplaceTempView("test_table1")
3150+
spark.range(100).selectExpr("id", "id % 5 as group_col")
3151+
.createOrReplaceTempView("test_table2")
3152+
3153+
// Create a simple query to get the plan
3154+
val df = spark.sql("""
3155+
SELECT t1.group_col, COUNT(*) as cnt
3156+
FROM test_table1 t1
3157+
JOIN test_table2 t2 ON t1.group_col = t2.group_col
3158+
GROUP BY t1.group_col
3159+
""")
3160+
3161+
// Instead of trying to trigger actual failures, let's directly test the error creation
3162+
val adaptivePlan = df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
3163+
3164+
// Access the private method to test error creation logic
3165+
val errors = Seq(
3166+
new RuntimeException("Stage 1 materialization failed"),
3167+
new RuntimeException("Stage 2 materialization failed")
3168+
)
3169+
3170+
// Use reflection to access and test the cleanUpAndThrowException method
3171+
val cleanUpMethod = classOf[AdaptiveSparkPlanExec].getDeclaredMethod(
3172+
"cleanUpAndThrowException", classOf[Seq[Throwable]], classOf[Option[Int]])
3173+
cleanUpMethod.setAccessible(true)
3174+
3175+
val exception = intercept[InvocationTargetException] {
3176+
cleanUpMethod.invoke(adaptivePlan, errors, None)
3177+
}
3178+
3179+
// Verify that we get the expected error class for multiple stage failures
3180+
val cause = exception.getCause.asInstanceOf[SparkException]
3181+
assert(cause.getCondition == "STAGE_MATERIALIZATION_MULTIPLE_FAILURES",
3182+
s"Expected STAGE_MATERIALIZATION_MULTIPLE_FAILURES, " +
3183+
s"got: ${cause.getCondition}")
3184+
val errorMessage = cause.getMessage
3185+
assert(errorMessage.contains("Multiple failures (2) in stage materialization:"),
3186+
s"Error message should contain failure count, got: $errorMessage")
3187+
assert(errorMessage.contains("1. RuntimeException: Stage 1 materialization failed"),
3188+
s"Error message should contain first error details, got: $errorMessage")
3189+
assert(errorMessage.contains("2. RuntimeException: Stage 2 materialization failed"),
3190+
s"Error message should contain second error details, got: $errorMessage")
3191+
}
3192+
}
3193+
}
31393194
}
31403195

31413196
/**

0 commit comments

Comments
 (0)