Skip to content

Commit 2d6f37e

Browse files
committed
Keep table protocol intact before and after REPLACE regardless of default CC enablement
1 parent f2c3ef7 commit 2d6f37e

25 files changed

+765
-273
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class IcebergConversionTransaction(
102102
def add(add: AddFile): Unit = throw new UnsupportedOperationException
103103
def add(remove: RemoveFile): Unit = throw new UnsupportedOperationException
104104

105-
def commit(): Unit = {
105+
def commit(expectedSequenceNumber: Long): Unit = {
106106
assert(!committed, "Already committed.")
107107
impl.commit()
108108
committed = true
@@ -118,7 +118,7 @@ class IcebergConversionTransaction(
118118
override def opType: String = "null"
119119
override def add(add: AddFile): Unit = {}
120120
override def add(remove: RemoveFile): Unit = {}
121-
override def commit(): Unit = {}
121+
override def commit(deltaCommitVersion: Long): Unit = {}
122122
}
123123
/**
124124
* API for appending new files in a table.
@@ -195,12 +195,12 @@ class IcebergConversionTransaction(
195195
removeBuffer += remove.toDataFile
196196
}
197197

198-
override def commit(): Unit = {
198+
override def commit(deltaCommitVersion: Long): Unit = {
199199
if (removeBuffer.nonEmpty) {
200200
rewriter.rewriteFiles(removeBuffer.asJava, addBuffer.asJava, 0)
201201
}
202202
currentSnapshotId.foreach(rewriter.validateFromSnapshot)
203-
super.commit()
203+
super.commit(deltaCommitVersion)
204204
}
205205
}
206206

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala

Lines changed: 147 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ import java.util.concurrent.atomic.AtomicReference
2020
import javax.annotation.concurrent.GuardedBy
2121

2222
import scala.collection.JavaConverters._
23+
import scala.collection.mutable.ArrayBuffer
2324
import scala.util.control.Breaks._
2425
import scala.util.control.NonFatal
2526

26-
import org.apache.spark.sql.delta.{CommittedTransaction, DeltaErrors, DeltaFileNotFoundException, DeltaFileProviderUtils, DeltaOperations, IcebergConstants, Snapshot, UniversalFormat, UniversalFormatConverter}
27+
import org.apache.spark.sql.delta.{CommittedTransaction, DeltaErrors, DeltaFileNotFoundException, DeltaFileProviderUtils, DeltaOperations, IcebergConstants, Snapshot, SnapshotDescriptor, UniversalFormat, UniversalFormatConverter}
2728
import org.apache.spark.sql.delta.DeltaOperations.OPTIMIZE_OPERATION_NAME
28-
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, RemoveFile}
29+
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, FileAction, RemoveFile}
2930
import org.apache.spark.sql.delta.hooks.IcebergConverterHook
3031
import org.apache.spark.sql.delta.logging.DeltaLogKeys
3132
import org.apache.spark.sql.delta.metering.DeltaLogging
@@ -69,6 +70,7 @@ object IcebergConverter {
6970
table.flatMap(_.properties().asScala.get(DELTA_TIMESTAMP_PROPERTY)).map(_.toLong)
7071
}
7172

73+
7274
/**
7375
* This class manages the transformation of delta snapshots into their Iceberg equivalent.
7476
*/
@@ -89,6 +91,7 @@ class IcebergConverter(spark: SparkSession)
8991
private var asyncConverterThreadActive: Boolean = false
9092
private val asyncThreadLock = new Object
9193

94+
private[icebergShaded]var targetSnapshot: SnapshotDescriptor = _
9295
/**
9396
* Enqueue the specified snapshot to be converted to Iceberg. This will start an async
9497
* job to run the conversion, unless there already is an async conversion running for
@@ -290,6 +293,8 @@ class IcebergConverter(spark: SparkSession)
290293
txnOpt: Option[CommittedTransaction],
291294
catalogTable: CatalogTable): Option[(Long, Long)] =
292295
recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") {
296+
297+
targetSnapshot = snapshotToConvert
293298
val cleanedCatalogTable =
294299
cleanCatalogTableIfEnablingUniform(catalogTable, snapshotToConvert, txnOpt)
295300
val log = snapshotToConvert.deltaLog
@@ -371,16 +376,20 @@ class IcebergConverter(spark: SparkSession)
371376

372377
val actionsToConvert = DeltaFileProviderUtils.parallelReadAndParseDeltaFilesAsIterator(
373378
log, spark, deltaFiles)
379+
var deltaVersion = prevSnapshot.version
374380
actionsToConvert.foreach { actionsIter =>
375381
try {
382+
deltaVersion += 1
383+
// TODO: get rid of this grouped batching behavior
376384
actionsIter.grouped(actionBatchSize).foreach { actionStrs =>
377385
val actions = actionStrs.map(Action.fromJson)
378386
needsExpireSnapshot ||= existsOptimize(actions)
379387

380388
runIcebergConversionForActions(
381389
icebergTxn,
382390
actions,
383-
prevConvertedSnapshotOpt)
391+
prevConvertedSnapshotOpt,
392+
deltaVersion)
384393
}
385394
} finally {
386395
actionsIter.close()
@@ -411,7 +420,7 @@ class IcebergConverter(spark: SparkSession)
411420
.grouped(actionBatchSize)
412421
.foreach { actions =>
413422
needsExpireSnapshot ||= existsOptimize(actions)
414-
runIcebergConversionForActions(icebergTxn, actions, None)
423+
runIcebergConversionForActions(icebergTxn, actions, None, snapshotToConvert.version)
415424
}
416425

417426
// Always attempt to update table metadata (schema/properties) for REPLACE_TABLE
@@ -461,7 +470,7 @@ class IcebergConverter(spark: SparkSession)
461470
})
462471
}
463472

464-
expireSnapshotHelper.commit()
473+
expireSnapshotHelper.commit(snapshotToConvert.version)
465474
}
466475

467476
// This is for newly enabling uniform table to
@@ -519,103 +528,153 @@ class IcebergConverter(spark: SparkSession)
519528
}
520529
}
521530
}
522-
523531
/**
524-
* Build an iceberg TransactionHelper from the provided txn, and commit the set of changes
525-
* specified by the actionsToCommit.
532+
* Commit the set of changes into an Iceberg snapshot. Each call to this function will
533+
* build exactly one Iceberg Snapshot.
534+
*
535+
* We determine what type of [[IcebergConversionTransaction.TransactionHelper]] to use
536+
* (and what type of Iceberg snapshot to create) based on the types of actions and
537+
* whether they contain data change. An [[UnsupportedOperationException]] will be
538+
* thrown for cases not listed in the table below. It means the combination of actions are
539+
* not recognized/supported. IcebergConverter will do a re-try with REPLACE TABLE, which
540+
* collects all valid data files from the target Delta snapshot and commit to Iceberg.
541+
*
542+
* Some Delta operations are known to contain only AddFiles(dataChange=false), intended to
543+
* replace/overwrite existing AddFiles. They rely on Delta's dedup in state reconstruction
544+
* and cannot be action-to-action translated to Iceberg, which lacks dedup abilities.
545+
* We create corresponding RemoveFile entries for the AddFiles so these operations can be
546+
* properly translated into [[RewriteFiles]] in Iceberg. These operations are marked as
547+
* [[needAutoRewrite]] in the code and the table below.
548+
*
549+
* The following table demonstrates how to choose the appropriate TransactionHelper.
550+
* The conditions can overlap and should be checked in order.
551+
* +-------------------+---------------+---------------------+--------------------+
552+
* | Type of actions | Data Change | TransactionHelper | Example / Note |
553+
* +-------------------+---------------+---------------------+--------------------+
554+
* | Create table | Any | AppendHelper | Note 1 |
555+
* +-------------------+---------------+---------------------+--------------------+
556+
* | | All | AppendHelper | INSERT |
557+
* | Add only +---------------+---------------------+--------------------+
558+
* | | None | needAutoRewrite | Note 2 |
559+
* | | | else | |
560+
* | | | NullHelper | Add Tag |
561+
* | +---------------+---------------------+--------------------+
562+
* | | Some | Unsupported | (unknown) |
563+
* +-------------------+---------------+---------------------+--------------------+
564+
* | Remove only | Any | RemoveHelper | DELETE |
565+
* +-------------------+---------------+---------------------+--------------------+
566+
* | | All | OverwriteHelper | UPDATE |
567+
* | Add + Remove +---------------+---------------------+--------------------+
568+
* | | None | RewriteHelper | OPTIMIZE |
569+
* | +---------------+---------------------+--------------------+
570+
* | | Some | Unsupported | (unknown) |
571+
* +-------------------+---------------+---------------------+--------------------+
572+
* Note:
573+
* 1. We assume a Create/Replace table operation will only contain AddFiles.
574+
* 2. DV is allowed but ignored as known operations (ComputeStats) do not touch DV.
526575
*/
527576
private[delta] def runIcebergConversionForActions(
528577
icebergTxn: IcebergConversionTransaction,
529578
actionsToCommit: Seq[Action],
530-
prevSnapshotOpt: Option[Snapshot]): Unit = {
531-
prevSnapshotOpt match {
579+
prevSnapshotOpt: Option[SnapshotDescriptor],
580+
deltaVersion: Long = 0): Unit = {
581+
582+
var commitInfo: Option[CommitInfo] = None
583+
var addFiles: Seq[AddFile] = Nil
584+
var removeFiles: Seq[RemoveFile] = Nil
585+
// Determining what txnHelper to use for this group of Actions requires a full-scan
586+
// of [[actionsToCommit]], which is not too expensive as the actions are already in-memory.
587+
588+
val txnHelper = prevSnapshotOpt match {
589+
// Having no previous Snapshot implies that the table is either being created or replaced.
590+
// This guarantees that the actions are fetched via [[Snapshot.allFiles]] and are unique.
532591
case None =>
533-
// If we don't have a previous snapshot, that implies that the table is either being
534-
// created or replaced. We can assume that the actions have already been deduped, and
535-
// only addFiles are present.
536-
val appendHelper = icebergTxn.getAppendOnlyHelper
537-
actionsToCommit.foreach {
538-
case a: AddFile => appendHelper.add(a)
539-
case _ => throw new IllegalStateException(s"Must provide only AddFiles when creating " +
540-
s"or replacing an Iceberg Table.")
592+
addFiles = actionsToCommit.asInstanceOf[Seq[AddFile]]
593+
if (addFiles.exists(_.deletionVector != null)) {
594+
throw new UnsupportedOperationException("Deletion Vector is not supported")
595+
} else {
596+
icebergTxn.getAppendOnlyHelper
541597
}
542-
appendHelper.commit()
543-
544598
case Some(_) =>
545-
// We have to go through the seq of actions twice, once to figure out the TransactionHelper
546-
// to use, and then again to commit the actions. This is not too expensive, since the max #
547-
// of actions is <= min(max # actions in delta json, ICEBERG_MAX_ACTIONS_TO_CONVERT)
548-
var hasAdds = false
549-
var hasRemoves = false
550-
var hasDataChange = false
551-
var hasCommitInfo = false
552-
var commitInfo: Option[CommitInfo] = None
553-
breakable {
554-
for (action <- actionsToCommit) {
555-
action match {
556-
case a: AddFile =>
557-
hasAdds = true
558-
if (a.dataChange) hasDataChange = true
559-
case r: RemoveFile =>
560-
hasRemoves = true
561-
if (r.dataChange) hasDataChange = true
562-
case ci: CommitInfo =>
563-
commitInfo = Some(ci)
564-
hasCommitInfo = true
565-
case _ => // Do nothing
566-
}
567-
if (hasAdds && hasRemoves && hasDataChange && hasCommitInfo) break // Short-circuit
568-
}
599+
val addBuffer = new ArrayBuffer[AddFile]()
600+
val removeBuffer = new ArrayBuffer[RemoveFile]()
601+
// Scan the actions to collect info needed to determine which txnHelper to use
602+
object DataChange extends Enumeration {
603+
val Empty = Value(0, "Empty")
604+
val None = Value(1, "None")
605+
val All = Value(2, "All")
606+
val Some = Value(3, "Some")
569607
}
608+
var dataChangeBits = 0
609+
var hasDv: Boolean = false
610+
val autoRewriteOprs = Set("COMPUTE STATS")
611+
var needAutoRewrite = false
570612

571-
// We want to know whether all actions in the commit are contained in this `actionsToCommit`
572-
// group. If yes, then we can safely determine whether the operation is a rewrite, delete,
573-
// append, overwrite, etc. If not, then we can't make any assumptions since we have
574-
// incomplete information, and we default to a rewrite.
575-
val allDeltaActionsCaptured = hasCommitInfo && actionsToCommit.size <
576-
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_ACTIONS_TO_CONVERT)
577-
578-
val addsAndRemoves = actionsToCommit
579-
.map(_.wrap)
580-
.filter(sa => sa.remove != null || sa.add != null)
581-
582-
if (hasAdds && hasRemoves && !hasDataChange && allDeltaActionsCaptured) {
583-
val rewriteHelper = icebergTxn.getRewriteHelper
584-
val split = addsAndRemoves.partition(_.add == null)
585-
addsAndRemoves.foreach { action =>
586-
if (action.add != null) {
587-
rewriteHelper.add(action.add)
588-
} else {
589-
rewriteHelper.add(action.remove)
590-
}
591-
}
592-
rewriteHelper.commit()
593-
} else if ((hasAdds && hasRemoves) || !allDeltaActionsCaptured) {
594-
val overwriteHelper = icebergTxn.getOverwriteHelper
595-
addsAndRemoves.foreach { action =>
596-
if (action.add != null) {
597-
overwriteHelper.add(action.add)
613+
actionsToCommit.foreach {
614+
case file: FileAction =>
615+
addBuffer ++= Option(file.wrap.add)
616+
removeBuffer ++= Option(file.wrap.remove)
617+
dataChangeBits |= (1 << (if (file.dataChange) 1 else 0))
618+
hasDv |= file.deletionVector != null
619+
case c: CommitInfo =>
620+
commitInfo = Some(c)
621+
needAutoRewrite = autoRewriteOprs.contains(c.operation)
622+
case _ => // Ignore other actions
623+
}
624+
addFiles = addBuffer.toSeq
625+
removeFiles = removeBuffer.toSeq
626+
val dataChange = DataChange(dataChangeBits)
627+
628+
(addFiles.nonEmpty, removeFiles.nonEmpty, dataChange) match {
629+
case (true, false, DataChange.All) if !hasDv =>
630+
icebergTxn.getAppendOnlyHelper
631+
case (true, false, DataChange.None) =>
632+
if (!needAutoRewrite) {
633+
icebergTxn.getNullHelper // Ignore
598634
} else {
599-
overwriteHelper.add(action.remove)
635+
// Create RemoveFiles to refresh these AddFiles without data change
636+
removeFiles = addBuffer.map(_.removeWithTimestamp(dataChange = false)).toSeq
637+
icebergTxn.getRewriteHelper
600638
}
601-
}
602-
overwriteHelper.commit()
603-
} else if (hasAdds) {
604-
if (!hasRemoves && !hasDataChange && allDeltaActionsCaptured) {
605-
logInfo(log"Skip Iceberg conversion for commit that only has AddFiles " +
606-
log"without any RemoveFiles or data change. CommitInfo: " +
607-
log"${MDC(DeltaLogKeys.DELTA_COMMIT_INFO, commitInfo)}")
608-
} else {
609-
val appendHelper = icebergTxn.getAppendOnlyHelper
610-
addsAndRemoves.foreach(action => appendHelper.add(action.add))
611-
appendHelper.commit()
612-
}
613-
} else if (hasRemoves) {
614-
val removeHelper = icebergTxn.getRemoveOnlyHelper
615-
addsAndRemoves.foreach(action => removeHelper.add(action.remove))
616-
removeHelper.commit()
639+
case (false, true, _) =>
640+
icebergTxn.getRemoveOnlyHelper
641+
case (true, true, DataChange.All) if !hasDv =>
642+
icebergTxn.getOverwriteHelper
643+
case (true, true, DataChange.None) if !hasDv =>
644+
icebergTxn.getRewriteHelper
645+
case (false, false, _) =>
646+
icebergTxn.getNullHelper
647+
case _ =>
648+
recordDeltaEvent(
649+
targetSnapshot.deltaLog,
650+
"delta.iceberg.conversion.unsupportedActions",
651+
data = Map(
652+
"version" -> targetSnapshot.version,
653+
"commitInfo" -> commitInfo.map(_.operation).getOrElse(""),
654+
"hasAdd" -> addFiles.nonEmpty.toString,
655+
"hasRemove" -> removeFiles.nonEmpty.toString,
656+
"dataChange" -> dataChange.toString,
657+
"hasDv" -> hasDv.toString
658+
)
659+
)
660+
throw new UnsupportedOperationException(
661+
"Unsupported combination of actions for incremental conversion.")
617662
}
618663
}
664+
recordDeltaEvent(
665+
targetSnapshot.deltaLog,
666+
"delta.iceberg.conversion.convertActions",
667+
data = Map(
668+
"version" -> targetSnapshot.version,
669+
"commitInfo" -> commitInfo.map(_.operation).getOrElse(""),
670+
"txnHelper" -> txnHelper.getClass.getSimpleName
671+
)
672+
)
673+
674+
removeFiles.foreach(txnHelper.add)
675+
addFiles.foreach(txnHelper.add)
676+
// Make sure the next snapshot sequence number is deltaVersion
677+
txnHelper.commit(deltaVersion)
619678
}
620679

621680
private def existsOptimize(actions: Seq[Action]): Boolean = {

iceberg/src/test/scala/org/apache/spark/sql/delta/CloneIcebergSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -947,6 +947,39 @@ class CloneNonSparkIcebergByPathSuite extends QueryTest
947947
}
948948
}
949949
}
950+
951+
test("block data path not under table path") {
952+
withTable(table, cloneTable) {
953+
val schema = new Schema(
954+
Seq[NestedField](
955+
NestedField.required(1, "id", Types.IntegerType.get),
956+
NestedField.required(2, "name", Types.StringType.get)
957+
).asJava
958+
)
959+
val rows = Seq(
960+
Map(
961+
"id" -> 1,
962+
"name" -> "alice"
963+
)
964+
)
965+
val table = NonSparkIcebergTestUtils.createIcebergTable(spark, tablePath, schema, rows)
966+
967+
// Create a new data file not under the table path
968+
withTempDir { dir =>
969+
val dataPath = dir.toPath.resolve("out_of_table.parquet").toAbsolutePath.toString
970+
NonSparkIcebergTestUtils.writeIntoIcebergTable(
971+
table,
972+
Seq(Map("id" -> 2, "name" -> "bob")),
973+
2,
974+
Some(dataPath)
975+
)
976+
val e = intercept[org.apache.spark.SparkException] {
977+
runCreateOrReplace(mode, sourceIdentifier)
978+
}
979+
assert(e.getMessage.contains("assertion failed: Fail to relativize path"))
980+
}
981+
}
982+
}
950983
}
951984

952985
class CloneIcebergByNameSuite extends CloneIcebergSuiteBase

0 commit comments

Comments
 (0)