Skip to content

Commit d7f21ff

Browse files
authored
[Delta] Keep table protocol intact before and after REPLACE regardless of default CC enablement (#4782)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR keeps table protocol intact before and after REPLACE commands. Under the current implementation, we have the following scenario that may automatically/silently upgrade a normal delta table's Protocol when using REPLACE commands. It would be upgraded from `(x, x)` (i.e., any combinations of protocol versions) to `(3, 7)`, plus an additional (unintended) `VacuumProtocolCheckTableFeature`. ```sql -- w/ protocol version (1, 2) if DV is turned off. 1. CREATE TABLE t (id LONG) USING delta; -- Enable CO by default in the current SparkSession. 2. spark.conf.set("default CO conf", "supported") -- The protocol would be automatically upgraded to (3, 7) w/ an extra VacuumProtocolCheckTableFeature. 3. REPLACE TABLE t (id LONG) USING delta; ``` <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? N/A <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? N/A <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 78ce7b4 commit d7f21ff

File tree

1 file changed

+29
-33
lines changed

1 file changed

+29
-33
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 29 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,33 @@ trait OptimisticTransactionImpl extends TransactionHelper
375375

376376
/** Creates new metadata with global Delta configuration defaults. */
377377
private def withGlobalConfigDefaults(metadata: Metadata): Metadata = {
378-
val conf = spark.sessionState.conf
378+
val isActiveReplaceCommand = isCreatingNewTable && readVersion != -1
379+
val conf = if (isActiveReplaceCommand &&
380+
CatalogOwnedTableUtils.defaultCatalogOwnedEnabled(spark)) {
381+
// Unset default CatalogOwned enablement iff:
382+
// 0. `isCreatingNewTable` indicates that this either is a REPLACE or CREATE command.
383+
// 1. `readVersion != 1` indicates the table already exists.
384+
// - 0) and 1) suggest that this is an active REPLACE command.
385+
// 2. Default CC enablement is set in the spark conf.
386+
// This prevents any unintended modifications to the `newProtocol`.
387+
// E.g., [[CatalogOwnedTableFeature]] and its dependent features
388+
// [[InCommitTimestampTableFeature]] & [[VacuumProtocolCheckTableFeature]].
389+
//
390+
// Note that this does *not* affect global spark conf state as we are modifying
391+
// the copy of `spark.sessionState.conf`. Thus, `defaultCatalogOwnedFeatureEnabledKey`
392+
// will remain unchanged for any concurrent operations that use the same SparkSession.
393+
val defaultCatalogOwnedFeatureEnabledKey =
394+
TableFeatureProtocolUtils.defaultPropertyKey(CatalogOwnedTableFeature)
395+
// Isolate the spark conf to be used in the subsequent [[DeltaConfigs.mergeGlobalConfigs]]
396+
// by cloning the existing configuration.
397+
// Note: [[SQLConf.clone]] is already atomic so no extra synchronization is needed.
398+
val clonedConf = spark.sessionState.conf.clone()
399+
// Unset default CC conf on the cloned spark conf.
400+
clonedConf.unsetConf(defaultCatalogOwnedFeatureEnabledKey)
401+
clonedConf
402+
} else {
403+
spark.sessionState.conf
404+
}
379405
metadata.copy(configuration = DeltaConfigs.mergeGlobalConfigs(
380406
conf, metadata.configuration))
381407
}
@@ -806,42 +832,12 @@ trait OptimisticTransactionImpl extends TransactionHelper
806832
CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS
807833
var newConfs: Map[String, String] = newConfsWithoutCC ++ existingCCConfs ++
808834
existingUCTableIdConf
809-
810-
val isCatalogOwnedEnabledBeforeReplace = snapshot.protocol
811-
.readerAndWriterFeatureNames.contains(CatalogOwnedTableFeature.name)
812-
if (!isCatalogOwnedEnabledBeforeReplace) {
813-
// Ignore the [[CatalogOwnedTableFeature]] if we are replacing an existing normal
814-
// table *without* CatalogOwned enabled.
815-
// This removes [[CatalogOwnedTableFeature]] that may have been added as a result of
816-
// the CatalogOwned spark configuration that enables it by default.
817-
// Users are *NOT* allowed to create a Catalog-Owned table with REPLACE TABLE
818-
// so it's fine to filter it out here.
819-
newProtocol = newProtocol.map(CatalogOwnedTableUtils.filterOutCatalogOwnedTableFeature)
820-
821-
val isICTEnabledBeforeReplace = existingICTConfs.nonEmpty ||
822-
// To prevent any potential protocol downgrade issue we check the existing
823-
// protocol as well.
824-
snapshot.protocol.readerAndWriterFeatureNames
825-
.contains(InCommitTimestampTableFeature.name)
826-
// Note that we only need to get explicit ICT configurations from `newConfs` here,
827-
// because all the default spark configurations should have been merged in the prior
828-
// `updateMetadataForNewTable` call.
829-
val isEnablingICTDuringReplace =
830-
CoordinatedCommitsUtils.getExplicitICTConfigurations(newConfs).nonEmpty
831-
if (!isICTEnabledBeforeReplace && !isEnablingICTDuringReplace) {
832-
// If existing table does *not* have ICT enabled, and we are *not* trying
833-
// to enable ICT manually through explicit overrides, then we should
834-
// filter any unintended [[InCommitTimestampTableFeature]] out here.
835-
newProtocol = newProtocol.map { p =>
836-
p.copy(writerFeatures = p.writerFeatures.map(
837-
_.filterNot(_ == InCommitTimestampTableFeature.name)))
838-
}
839-
}
840-
}
841835
// We also need to retain the existing ICT dependency configurations, but only when the
842836
// existing table does have Coordinated Commits configurations or Catalog-Owned enabled.
843837
// Otherwise, we treat the ICT configurations the same as any other configurations,
844838
// by merging them from the default.
839+
val isCatalogOwnedEnabledBeforeReplace = snapshot.protocol
840+
.readerAndWriterFeatureNames.contains(CatalogOwnedTableFeature.name)
845841
if (existingCCConfs.nonEmpty || isCatalogOwnedEnabledBeforeReplace) {
846842
val newConfsWithoutICT = newConfs -- CoordinatedCommitsUtils.ICT_TABLE_PROPERTY_KEYS
847843
newConfs = newConfsWithoutICT ++ existingICTConfs

0 commit comments

Comments
 (0)