Skip to content

[Feature Request] OPTIMIZED WRITE #1158

@sezruby

Description

@sezruby

Feature request

Overview

Support Optimized Write described in https://docs.databricks.com/delta/optimizations/auto-optimize.html#how-optimized-writes-work

Motivation

With frequent append jobs or MERGE/UPDATE operation, underlying data can be easily fragmented.
OPTIMIZE and Auto Compaction cannot leverage all scenarios as it's involving additional commit.

We can utilize the existing functionalities of adaptive query execution, e.g. :
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewInRebalancePartitions.scala
https://github.com/apache/spark/blob/d9129205a0133650e3fc6f335846cc6fe88c9ea9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L1551

As RebalancePartitions works with AQE, we need to add additional plans for OptimizeWrite.

Further details

If Optimize Write is enabled, we can modify the physical plan for writing data in TransactionalWrite.scala

Things to do

  1. Repartition
    • RoundRobinPartitioning for non-partitioned data
    • HashPartitioning for partitioned data, with partition values
  2. Rebalance partitions - Adaptive Partition Reader
  3. Remove unnecessary repartitions
    • There can be repartition operations in the plan which users added.
    • Unnecessary repartitions can be removed by CollapseRepartition in general
    • So we just need to remove the repartition on top of the query plan.
  • exclude OPTIMIZE/ZORDER operation - no need to do OptimizeWrite

Example for Optimize Write for partitioned data

image

Process
  • Each color block shows the amount of row data from each previous partition1, 2, 3
  • As the data is partitioned by partCol, use HashPartitioning(partCol, 4) for exchange (assume spark.sql.shuffle.partitios=4)
    • The data can be skewed. (e.g. less rows of partCol=B, more rows of partCol=E)
  • Merge small partitions first
  • Split large partitions if needed
  • To avoid too small like P4-2, introduced optimizeWrite.smallPartitionFactor / optimizeWrite.mergedPartitionFactor
    • same as in Spark implementation
    • make it configurable, but would be internal as it's too much detail for users.
Expected output file layout

image

  • Without OptimizeWrite, there will be 3 x 4 (partition1,2,3 x partition value D,A,B,E) - 2 (no green for partCol=A,B) = 10
    • Written by 3 Spark tasks
  • With df.repartition(partCol), there will be just one file for each partition - 2 large and 2 small files
    • Written by 4 Spark tasks (one for each partition, if hash values are all different like the example)
  • With OptimizeWrite, 7 of rebalanced files will be generated. [P1-1, P1-2, P1-3, P2, P3, P4-1:2, P4-3]
    • Written by 6 Spark tasks, as we rebalanced => better job distribution, expect some performance gain of write job.
    • P2, P3 will be written by a spark task, as they are coalesced. The data will be sorted by partition value within the partition before writing.
Limitations
  • As this feature utilizes the existing Spark rebalance implementation, we need to use the PartitionSpec accordingly for rebalancing.
  • Cannot split each block from "previous stage" after Exchange, even it's too large. This can result in skewed output file size even using OptimizeWrite, if the input plan has inefficient output RDD / skewed data.
    • e.g. 1 large partition with same partition value cannot be split
  • Cannot reorder each block for merging small blocks as we can just specify the range of partition id.

Things to consider

  • add LogicalPlan & Rule[LogicalPlan] & SparkStrategy & SparkPlan OR add SparkPlan only
  • how to handle ordered data - we can skip OptimizeWrite if SortExec of non-partitioning columns is in the plan
  • RoundRobin partitioning can be inefficient for non-partitioned data. It distributes all rows.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • Yes. I can contribute this feature independently.

NOTE: We have a working prototype for OptimizeWrite Delta 1.0 / Spark 3.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    Status

    Done

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions