Skip to content

Commit 7db1d58

Browse files
authored
Stop supporting DELTA_CONVERT_ICEBERG_UNSAFE_MOR_TABLE_ENABLE (#5017)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> ## Description Stop supporting `DELTA_CONVERT_ICEBERG_UNSAFE_MOR_TABLE_ENABLE`. It's an extremely unsafe flag, which can lead to a corrupted table. As it is strongly not recommended, this PR proposes to stop its support ## How was this patch tested? UTs
1 parent 28548fd commit 7db1d58

File tree

5 files changed

+15
-31
lines changed

5 files changed

+15
-31
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.commands.convert
1919
import scala.collection.JavaConverters._
2020
import scala.collection.mutable
2121

22-
import org.apache.spark.sql.delta.{DeltaColumnMapping, SerializableFileStatus}
22+
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaLog, SerializableFileStatus, Snapshot => DeltaSnapshot}
2323
import org.apache.spark.sql.delta.DeltaErrors.cloneFromIcebergSourceWithPartitionEvolution
2424
import org.apache.spark.sql.delta.commands.convert.IcebergTable.ERR_MULTIPLE_PARTITION_SPECS
2525
import org.apache.spark.sql.delta.logging.DeltaLogKeys
@@ -100,14 +100,8 @@ class IcebergFileManifest(
100100
return spark.emptyDataset[ConvertTargetFile]
101101
}
102102

103-
// We do not support Iceberg Merge-On-Read using delete files and will by default
104-
// throw errors when encountering them. This flag will bypass the check and ignore the delete
105-
// files, and in other words, generating a CORRUPTED table. We keep this flag only for
106-
// backward compatibility. No new use cases of this flag should be allowed.
107-
val unsafeConvertMorTable =
108-
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_UNSAFE_MOR_TABLE_ENABLE)
109103
val hasMergeOnReadDeletionFiles = table.currentSnapshot().deleteManifests(table.io()).size() > 0
110-
if (hasMergeOnReadDeletionFiles && !unsafeConvertMorTable) {
104+
if (hasMergeOnReadDeletionFiles) {
111105
throw new UnsupportedOperationException(
112106
s"Cannot support convert Iceberg table with row-level deletes." +
113107
s"Please trigger an Iceberg compaction and retry the command.")
@@ -142,8 +136,7 @@ class IcebergFileManifest(
142136

143137
val dataFiles = loadIcebergFiles()
144138

145-
dataFiles
146-
.map { dataFile: DataFileWrapper =>
139+
dataFiles.map { dataFile: DataFileWrapper =>
147140
if (shouldCheckPartitionEvolution) {
148141
IcebergFileManifest.validateLimitedPartitionEvolution(
149142
dataFile.specId,

iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -862,15 +862,6 @@ trait ConvertIcebergToDeltaSuiteBase
862862
spark.sql(s"DELETE FROM $table WHERE id = 1")
863863
// By default, conversion should fail because it is unsafe.
864864
assertConversionFailed()
865-
// Force escape should work
866-
withSQLConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_UNSAFE_MOR_TABLE_ENABLE.key -> "true") {
867-
convert(s"iceberg.`$tablePath`")
868-
// ... but with data duplication
869-
checkAnswer(
870-
spark.read.format("delta").load(tablePath),
871-
(0 until 100).map(i => Row(i.toLong, s"name_$i"))
872-
)
873-
}
874865
}
875866

876867
// --- UPDATE

spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,15 @@ trait ConvertUtilsBase extends DeltaLogging {
224224
fs.makeQualified(path).toUri.toString
225225
}
226226

227-
AddFile(pathStrForAddFile, partition, file.length, file.modificationTime, dataChange = true,
228-
stats = targetFile.stats.orNull)
227+
228+
AddFile(
229+
pathStrForAddFile,
230+
partition,
231+
file.length,
232+
file.modificationTime,
233+
dataChange = true,
234+
stats = targetFile.stats.orNull
235+
)
229236
}
230237

231238
/**

spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/interfaces.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.io.Closeable
2020

2121
import scala.collection.JavaConverters._
2222

23-
import org.apache.spark.sql.delta.{DeltaColumnMappingMode, NoMapping, SerializableFileStatus}
23+
import org.apache.spark.sql.delta.{DeltaColumnMappingMode, DeltaLog, NoMapping, SerializableFileStatus}
2424

2525
import org.apache.spark.sql.Dataset
2626
import org.apache.spark.sql.functions.sum
@@ -95,4 +95,5 @@ case class ConvertTargetFile(
9595
fileStatus: SerializableFileStatus,
9696
partitionValues: Option[Map[String, String]] = None,
9797
parquetSchemaDDL: Option[String] = None,
98-
stats: Option[String] = None) extends Serializable
98+
stats: Option[String] = None
99+
) extends Serializable

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,14 +1707,6 @@ trait DeltaSQLConfBase {
17071707
.booleanConf
17081708
.createWithDefault(true)
17091709

1710-
val DELTA_CONVERT_ICEBERG_UNSAFE_MOR_TABLE_ENABLE =
1711-
buildConf("convert.iceberg.unsafeConvertMorTable.enabled")
1712-
.doc("If enabled, iceberg merge-on-read tables can be unsafely converted by ignoring " +
1713-
"deletion files. This could cause data duplication and is strongly not recommended.")
1714-
.internal()
1715-
.booleanConf
1716-
.createWithDefault(false)
1717-
17181710
val DELTA_CONVERT_ICEBERG_CAST_TIME_TYPE = {
17191711
buildConf("convert.iceberg.castTimeType")
17201712
.internal()

0 commit comments

Comments
 (0)