Skip to content

Skip SortExec for partitioning columns in OPTIMIZE #1166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

sezruby
Copy link
Contributor

@sezruby sezruby commented Jun 1, 2022

Description

In OPTIMIZE, as it just reads from one directory to compact data, we don't need SortExec for partitioned data.
We can skip it by passing the plan with SortOrder of partitioned columns.

Fixes #948

How was this patch tested?

Test data: 860MB parquet in total, 100 files

Non partitioned data

spark.range(200000000).map { _ =>
    (scala.util.Random.nextInt(10).toLong, scala.util.Random.nextInt(1000000000), scala.util.Random.nextInt(1))
}.toDF("colA", "colB", "colC").repartition(100).write.mode("overwrite").format("delta").save(dataPath)

Partitioned data, but all values are same (colC = 0)

spark.range(200000000).map { _ =>
    (scala.util.Random.nextInt(10).toLong, scala.util.Random.nextInt(1000000000), scala.util.Random.nextInt(1))
}.toDF("colA", "colB", "colC").repartition(100).write.partitionBy("colC").mode("overwrite").format("delta").save(dataPath + "1")

E2E duration of OPTIMIZE with master + the PR

  • non-partitioned: 2 min 28 sec
  • partitioned: 2 min 24 sec

E2E duration of OPTIMIZE with master

  • non-partitioned: 2 min 30 sec
  • partitioned: 3 min 4 sec

Does this PR introduce any user-facing changes?

No

@sezruby
Copy link
Contributor Author

sezruby commented Jun 2, 2022

@zsxwing Could you review the PR? Thanks!

@tdas tdas requested a review from vkorukanti June 2, 2022 18:10
@@ -237,7 +237,7 @@ class OptimizeExecutor(
sparkSession.sparkContext.getLocalProperty(SPARK_JOB_GROUP_ID),
description)

val addFiles = txn.writeFiles(repartitionDF).collect {
val addFiles = txn.writeFiles(repartitionDF, actionType = "Optimize").collect {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried, but wondering if there is a way we could specify the SortOrder on the repartionDf here itself, so that we can avoid passing a action type to txn.writeFiles?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no way to do that as it's not a spark plan. Also Spark Optimizer might build some incorrect plan with a fake df, so it's better to add after applying all rules.
We may need to add the actionType for OptimizeWrite, to exclude OPTIMIZE/ZORDER/Auto compaction for OptimizeWrite feature after all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can read actionType from TahoeBatchFileIndex.
Let me try it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this:

    val input = txn.deltaLog.createDataFrame(txn.snapshot, bin, actionTypeOpt = Some("Optimize"))
    val repartitionDF = input.coalesce(numPartitions = 1)

    val sortOrder =
      partitionColumns.map(p => SortOrder(p, Ascending, Seq.empty[Expression]))
    val df = LogicalRDD(
      outputAttributes,
     repartitionDF.queryExecution.toRdd,
      outputOrdering = sortOrder)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this approach, but it requires some code in OptimizeTableCommand.scala - extracting physcalPlan for partitionColumns/outputattributes... etc
In addition, I'd like to minimize the side effect that may incur from the fake outputOrdering while spark optimizer. Though I know the plan for compaction is not that complicated.

How about the current PR?
In any case I need to use TransactionalWrite.isOptimizeCommand for OptimizeWrite functionality.

@sezruby sezruby force-pushed the optsortexec branch 4 times, most recently from 46c747d to 46c2a55 Compare June 10, 2022 23:29
@scottsand-db scottsand-db self-requested a review August 1, 2022 18:22
@sezruby
Copy link
Contributor Author

sezruby commented Oct 24, 2022

Hi @scottsand-db can someone review the PR? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Eliminate the unnecessary sort in optimize (file compaction)
2 participants