-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-50903][CONNECT] Cache logical plans after analysis #49584
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,15 +32,19 @@ import org.apache.spark.{SparkEnv, SparkException, SparkSQLException} | |
import org.apache.spark.api.python.PythonFunction.PythonAccumulator | ||
import org.apache.spark.connect.proto | ||
import org.apache.spark.internal.{Logging, LogKeys, MDC} | ||
import org.apache.spark.sql.DataFrame | ||
import org.apache.spark.sql.{DataFrame, Row} | ||
import org.apache.spark.sql.catalyst.QueryPlanningTracker | ||
import org.apache.spark.sql.catalyst.encoders.RowEncoder | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.classic.SparkSession | ||
import org.apache.spark.sql.classic.{Dataset, SparkSession} | ||
import org.apache.spark.sql.connect.common.InvalidPlanInput | ||
import org.apache.spark.sql.connect.config.Connect | ||
import org.apache.spark.sql.connect.ml.MLCache | ||
import org.apache.spark.sql.connect.planner.PythonStreamingQueryListener | ||
import org.apache.spark.sql.connect.planner.SparkConnectPlanner | ||
import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper | ||
import org.apache.spark.sql.connect.service.SessionHolder.{ERROR_CACHE_SIZE, ERROR_CACHE_TIMEOUT_SEC} | ||
import org.apache.spark.sql.execution.{CommandExecutionMode, ShuffleCleanupMode} | ||
import org.apache.spark.sql.streaming.StreamingQueryListener | ||
import org.apache.spark.util.{SystemClock, Utils} | ||
|
||
|
@@ -440,46 +444,80 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio | |
* `spark.connect.session.planCache.enabled` is true. | ||
* @param rel | ||
* The relation to transform. | ||
* @param cachePlan | ||
* Whether to cache the result logical plan. | ||
* @param transform | ||
* Function to transform the relation into a logical plan. | ||
* @return | ||
* The logical plan. | ||
* The logical plan and a flag indicating that the plan cache was hit. | ||
*/ | ||
private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)( | ||
transform: proto.Relation => LogicalPlan): LogicalPlan = { | ||
val planCacheEnabled = Option(session) | ||
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)) | ||
// We only cache plans that have a plan ID. | ||
val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId | ||
|
||
def getPlanCache(rel: proto.Relation): Option[LogicalPlan] = | ||
planCache match { | ||
case Some(cache) if planCacheEnabled && hasPlanId => | ||
Option(cache.getIfPresent(rel)) match { | ||
case Some(plan) => | ||
private[connect] def usePlanCache(rel: proto.Relation)( | ||
transform: proto.Relation => LogicalPlan): (LogicalPlan, Boolean) = { | ||
planCache match { | ||
case Some(cache) if canCachePlan(rel) => | ||
Option(cache.getIfPresent(rel)) match { | ||
case Some(plan) => | ||
if (isPlanOutdated(plan)) { | ||
// The plan is outdated, therefore remove it from the cache. | ||
cache.invalidate(rel) | ||
} else { | ||
logDebug(s"Using cached plan for relation '$rel': $plan") | ||
Some(plan) | ||
case None => None | ||
} | ||
case _ => None | ||
} | ||
def putPlanCache(rel: proto.Relation, plan: LogicalPlan): Unit = | ||
planCache match { | ||
case Some(cache) if planCacheEnabled && hasPlanId => | ||
cache.put(rel, plan) | ||
case _ => | ||
return (plan, true) | ||
} | ||
case None => () | ||
} | ||
case _ => () | ||
} | ||
(transform(rel), false) | ||
} | ||
|
||
/** | ||
* Create a data frame from the supplied relation, and update the plan cache. | ||
* | ||
* @param rel | ||
* A proto.Relation to create a data frame. | ||
* @param options | ||
* Options to pass to the data frame for plan execution. If None is provided, the command | ||
* execution mode is set to SKIP. | ||
* @return | ||
* The created data frame. | ||
*/ | ||
private[connect] def createDataFrame( | ||
rel: proto.Relation, | ||
planner: SparkConnectPlanner, | ||
options: Option[(QueryPlanningTracker, ShuffleCleanupMode)] = None): DataFrame = { | ||
val (plan, cacheHit) = planner.transformRelationWithCache(rel) | ||
val df = options match { | ||
case Some((tracker, cleanupMode)) => | ||
Dataset.ofRows(session, plan, tracker, cleanupMode) | ||
case None => | ||
val qe = session.sessionState.executePlan(plan, CommandExecutionMode.SKIP) | ||
new Dataset[Row](qe, () => RowEncoder.encoderFor(qe.analyzed.schema)) | ||
} | ||
if (!cacheHit && planCache.isDefined && canCachePlan(rel)) { | ||
if (df.queryExecution.isLazyAnalysis) { | ||
val plan = df.queryExecution.logical | ||
logDebug(s"Cache a lazyily analyzed logical plan for '$rel': $plan") | ||
planCache.get.put(rel, plan) | ||
} else { | ||
val plan = df.queryExecution.analyzed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may have to add some invalidation logic here. The problem is that some of objects (tables/views/udfs) used in the query can change, in that case we may want to validate that the objects used are the most recent ones. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I'll need to think about the interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added LogicalPlan.isOutdated and updated the usePlanCache method. |
||
logDebug(s"Cache an analyzed logical plan for '$rel': $plan") | ||
planCache.get.put(rel, plan) | ||
} | ||
} | ||
df | ||
} | ||
|
||
getPlanCache(rel) | ||
.getOrElse({ | ||
val plan = transform(rel) | ||
if (cachePlan) { | ||
putPlanCache(rel, plan) | ||
} | ||
plan | ||
}) | ||
// Return true if the plan is outdated and should be removed from the cache. | ||
private def isPlanOutdated(plan: LogicalPlan): Boolean = { | ||
// Currently, nothing is checked. | ||
false | ||
} | ||
|
||
// Return true if the plan cache is enabled for the session and the relation. | ||
private def canCachePlan(rel: proto.Relation): Boolean = { | ||
// We only cache plans that have a plan ID. | ||
rel.hasCommon && rel.getCommon.hasPlanId && | ||
Option(session) | ||
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)) | ||
} | ||
|
||
// For testing. Expose the plan cache for testing purposes. | ||
|
Uh oh!
There was an error while loading. Please reload this page.