Skip to content

Commit 38d2d1d

Browse files
author
changgyoopark-db
committed
Impl
1 parent 6d66f26 commit 38d2d1d

File tree

2 files changed

+28
-9
lines changed

2 files changed

+28
-9
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import org.apache.spark.sql.connect.planner.PythonStreamingQueryListener
4242
import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper
4343
import org.apache.spark.sql.connect.service.ExecuteKey
4444
import org.apache.spark.sql.connect.service.SessionHolder.{ERROR_CACHE_SIZE, ERROR_CACHE_TIMEOUT_SEC}
45+
import org.apache.spark.sql.execution.QueryExecution
4546
import org.apache.spark.sql.streaming.StreamingQueryListener
4647
import org.apache.spark.util.{SystemClock, Utils}
4748

@@ -450,14 +451,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
450451
*/
451452
private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)(
452453
transform: proto.Relation => LogicalPlan): LogicalPlan = {
453-
val planCacheEnabled = Option(session)
454-
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true))
455454
// We only cache plans that have a plan ID.
456-
val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId
455+
val planCacheEnabled = rel.hasCommon && rel.getCommon.hasPlanId &&
456+
Option(session)
457+
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true))
457458

458459
def getPlanCache(rel: proto.Relation): Option[LogicalPlan] =
459460
planCache match {
460-
case Some(cache) if planCacheEnabled && hasPlanId =>
461+
case Some(cache) if planCacheEnabled =>
461462
Option(cache.getIfPresent(rel)) match {
462463
case Some(plan) =>
463464
logDebug(s"Using cached plan for relation '$rel': $plan")
@@ -466,20 +467,36 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
466467
}
467468
case _ => None
468469
}
469-
def putPlanCache(rel: proto.Relation, plan: LogicalPlan): Unit =
470+
def putPlanCache(rel: proto.Relation, plan: LogicalPlan): LogicalPlan = {
470471
planCache match {
471-
case Some(cache) if planCacheEnabled && hasPlanId =>
472-
cache.put(rel, plan)
473-
case _ =>
472+
case Some(cache) if planCacheEnabled =>
473+
val analyzedPlan = if (plan.analyzed) {
474+
plan
475+
} else {
476+
val qe = new QueryExecution(session, plan)
477+
if (qe.isLazyAnalysis) {
478+
// The plan is intended to be lazily analyzed.
479+
plan
480+
} else {
481+
// Make sure that the plan is fully analyzed before being cached.
482+
qe.assertAnalyzed()
483+
qe.analyzed
484+
}
485+
}
486+
cache.put(rel, analyzedPlan)
487+
analyzedPlan
488+
case _ => plan
474489
}
490+
}
475491

476492
getPlanCache(rel)
477493
.getOrElse({
478494
val plan = transform(rel)
479495
if (cachePlan) {
480496
putPlanCache(rel, plan)
497+
} else {
498+
plan
481499
}
482-
plan
483500
})
484501
}
485502

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
314314
case Some(expectedCachedRelations) =>
315315
val cachedRelations = sessionHolder.getPlanCache.get.asMap().keySet().asScala
316316
assert(cachedRelations.size == expectedCachedRelations.size)
317+
val cachedLogicalPlans = sessionHolder.getPlanCache.get.asMap().values().asScala
318+
cachedLogicalPlans.foreach(plan => assert(plan.analyzed))
317319
expectedCachedRelations.foreach(relation => assert(cachedRelations.contains(relation)))
318320
case None => assert(sessionHolder.getPlanCache.isEmpty)
319321
}

0 commit comments

Comments
 (0)