Skip to content

[SPARK-52492][SQL] Make InMemoryRelation.convertToColumnarIfPossible customizable #51189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, Length, LessThan, LessThanOrEqual, Literal, Or, Predicate, StartsWith}
import org.apache.spark.sql.execution.{ColumnarToRowTransition, InputAdapter, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.columnar.{ColumnStatisticsSchema, PartitionStatistics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{AtomicType, BinaryType, StructType}
Expand Down Expand Up @@ -58,6 +60,39 @@ trait CachedBatchSerializer extends Serializable {
*/
def supportsColumnarInput(schema: Seq[Attribute]): Boolean

/**
* Attempt to convert a query plan to its columnar equivalence for columnar caching.
* Called on the query plan that is about to be cached once [[supportsColumnarInput]] returns
* true on its output schema.
*
* The default implementation works by stripping the topmost columnar-to-row transition to
* expose the columnar-based plan to the serializer.
*
* @param plan The plan to convert.
* @return The output plan. Could either be a columnar plan if the input plan is convertible, or
* the input plan unchanged if no viable conversion can be done.
*/
@Since("4.1.0")
def convertToColumnarPlanIfPossible(plan: SparkPlan): SparkPlan = plan match {
case gen: WholeStageCodegenExec =>
gen.child match {
case c2r: ColumnarToRowTransition =>
c2r.child match {
case ia: InputAdapter => ia.child
case _ => plan
}
case _ => plan
}
case c2r: ColumnarToRowTransition => // This matches when whole stage code gen is disabled.
c2r.child
case adaptive: AdaptiveSparkPlanExec =>
// If AQE is enabled for cached plan and table cache supports columnar in, we should mark
// `AdaptiveSparkPlanExec.supportsColumnar` as true to avoid inserting `ColumnarToRow`, so
// that `CachedBatchSerializer` can use `convertColumnarBatchToCachedBatch` to cache data.
adaptive.copy(supportsColumnar = true)
case _ => plan
}

/**
* Convert an `RDD[InternalRow]` into an `RDD[CachedBatch]` in preparation for caching the data.
* @param input the input `RDD` to be converted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,8 @@ object InMemoryRelation {
/* Visible for testing */
private[columnar] def clearSerializer(): Unit = synchronized { ser = None }

def convertToColumnarIfPossible(plan: SparkPlan): SparkPlan = plan match {
case gen: WholeStageCodegenExec => gen.child match {
case c2r: ColumnarToRowTransition => c2r.child match {
case ia: InputAdapter => ia.child
case _ => plan
}
case _ => plan
}
case c2r: ColumnarToRowTransition => // This matches when whole stage code gen is disabled.
c2r.child
case adaptive: AdaptiveSparkPlanExec =>
// If AQE is enabled for cached plan and table cache supports columnar in, we should mark
// `AdaptiveSparkPlanExec.supportsColumnar` as true to avoid inserting `ColumnarToRow`, so
// that `CachedBatchSerializer` can use `convertColumnarBatchToCachedBatch` to cache data.
adaptive.copy(supportsColumnar = true)
case _ => plan
def convertToColumnarIfPossible(plan: SparkPlan): SparkPlan = {
getSerializer(plan.conf).convertToColumnarPlanIfPossible(plan)
}

def apply(
Expand All @@ -406,7 +392,7 @@ object InMemoryRelation {
val optimizedPlan = qe.optimizedPlan
val serializer = getSerializer(optimizedPlan.conf)
val child = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
convertToColumnarIfPossible(qe.executedPlan)
serializer.convertToColumnarPlanIfPossible(qe.executedPlan)
} else {
qe.executedPlan
}
Expand All @@ -433,8 +419,9 @@ object InMemoryRelation {

def apply(cacheBuilder: CachedRDDBuilder, qe: QueryExecution): InMemoryRelation = {
val optimizedPlan = qe.optimizedPlan
val newBuilder = if (cacheBuilder.serializer.supportsColumnarInput(optimizedPlan.output)) {
cacheBuilder.copy(cachedPlan = convertToColumnarIfPossible(qe.executedPlan))
val serializer = cacheBuilder.serializer
val newBuilder = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
cacheBuilder.copy(cachedPlan = serializer.convertToColumnarPlanIfPossible(qe.executedPlan))
} else {
cacheBuilder.copy(cachedPlan = qe.executedPlan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.ColumnarToRowExec
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}
import org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
Expand Down Expand Up @@ -122,6 +122,25 @@ class TestSingleIntColumnarCachedBatchSerializer extends CachedBatchSerializer {
}
}

/**
* An equivalence of Spark's [[DefaultCachedBatchSerializer]] while the API
* [[convertToColumnarPlanIfPossible]] is being tested.
*/
class DefaultCachedBatchSerializerNoUnwrap extends DefaultCachedBatchSerializer {
override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = {
// Return true to let Spark call #convertToColumnarPlanIfPossible to unwrap the input
// columnar plan out from the guard of the topmost ColumnarToRowExec.
true
}

override def convertToColumnarPlanIfPossible(plan: SparkPlan): SparkPlan = {
assert(!plan.supportsColumnar)
// Disable the unwrapping code path from default CachedBatchSerializer so
// Spark will keep the topmost columnar-to-row plan node.
plan
}
}

class CachedBatchSerializerSuite extends QueryTest
with SharedSparkSession with AdaptiveSparkPlanHelper {
import testImplicits._
Expand Down Expand Up @@ -180,3 +199,41 @@ class CachedBatchSerializerSuite extends QueryTest
}
}
}


class CachedBatchSerializerNoUnwrapSuite extends QueryTest
with SharedSparkSession with AdaptiveSparkPlanHelper {

import testImplicits._

override protected def sparkConf: SparkConf = {
super.sparkConf.set(
StaticSQLConf.SPARK_CACHE_SERIALIZER.key,
classOf[DefaultCachedBatchSerializerNoUnwrap].getName)
}

test("Do not unwrap ColumnarToRowExec") {
withTempPath { workDir =>
val workDirPath = workDir.getAbsolutePath
val input = Seq(100, 200).toDF("count")
input.write.parquet(workDirPath)
val data = spark.read.parquet(workDirPath)
data.cache()
val df = data.union(data)
assert(df.count() == 4)
checkAnswer(df, Row(100) :: Row(200) :: Row(100) :: Row(200) :: Nil)

val finalPlan = df.queryExecution.executedPlan
val cachedPlans = finalPlan.collect {
case i: InMemoryTableScanExec => i.relation.cachedPlan
}
assert(cachedPlans.length == 2)
cachedPlans.foreach {
cachedPlan =>
assert(cachedPlan.isInstanceOf[WholeStageCodegenExec])
assert(cachedPlan.asInstanceOf[WholeStageCodegenExec]
.child.isInstanceOf[ColumnarToRowExec])
}
}
}
}