-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Skip SortExec for partitioning columns in OPTIMIZE #1166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@zsxwing Could you review the PR? Thanks! |
@@ -237,7 +237,7 @@ class OptimizeExecutor( | |||
sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID), | |||
description) | |||
|
|||
val addFiles = txn.writeFiles(repartitionDF).collect { | |||
val addFiles = txn.writeFiles(repartitionDF, actionType = "Optimize").collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tried, but wondering if there is a way we could specify the SortOrder
on the repartionDf
here itself, so that we can avoid passing a action type to txn.writeFiles
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no way to do that as it's not a spark plan. Also Spark Optimizer might build some incorrect plan with a fake df, so it's better to add after applying all rules.
We may need to add the actionType for OptimizeWrite, to exclude OPTIMIZE/ZORDER/Auto compaction for OptimizeWrite feature after all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can read actionType from TahoeBatchFileIndex.
Let me try it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this:
val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize"))
val repartitionDF = input.coalesce(numPartitions = 1)
val sortOrder =
partitionColumns.map(p => SortOrder(p, Ascending, Seq.empty[Expression]))
val df = LogicalRDD(
outputAttributes,
repartitionDF.queryExecution.toRdd,
outputOrdering = sortOrder)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this approach, but it requires some code in OptimizeTableCommand.scala - extracting physcalPlan for partitionColumns/outputattributes... etc
In addition, I'd like to minimize the side effect that may incur from the fake outputOrdering while spark optimizer. Though I know the plan for compaction is not that complicated.
How about the current PR?
In any case I need to use TransactionalWrite.isOptimizeCommand for OptimizeWrite functionality.
46c747d
to
46c2a55
Compare
Signed-off-by: Eunjin Song <[email protected]>
Hi @scottsand-db can someone review the PR? Thanks! |
Description
In OPTIMIZE, as it just reads from one directory to compact data, we don't need
SortExec
for partitioned data.We can skip it by passing the plan with SortOrder of partitioned columns.
Fixes #948
How was this patch tested?
Test data: 860MB parquet in total, 100 files
Non partitioned data
Partitioned data, but all values are same (colC = 0)
E2E duration of OPTIMIZE with master + the PR
E2E duration of OPTIMIZE with master
Does this PR introduce any user-facing changes?
No