Skip to content

[Delta] Keep table protocol intact before and after REPLACE regardless of default CC enablement #4782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,33 @@ trait OptimisticTransactionImpl extends TransactionHelper

/** Creates new metadata with global Delta configuration defaults. */
private def withGlobalConfigDefaults(metadata: Metadata): Metadata = {
val conf = spark.sessionState.conf
val isActiveReplaceCommand = isCreatingNewTable && readVersion != -1
val conf = if (isActiveReplaceCommand &&
CatalogOwnedTableUtils.defaultCatalogOwnedEnabled(spark)) {
// Unset default CatalogOwned enablement iff:
// 0. `isCreatingNewTable` indicates that this either is a REPLACE or CREATE command.
// 1. `readVersion != 1` indicates the table already exists.
// - 0) and 1) suggest that this is an active REPLACE command.
// 2. Default CC enablement is set in the spark conf.
// This prevents any unintended modifications to the `newProtocol`.
// E.g., [[CatalogOwnedTableFeature]] and its dependent features
// [[InCommitTimestampTableFeature]] & [[VacuumProtocolCheckTableFeature]].
//
// Note that this does *not* affect global spark conf state as we are modifying
// the copy of `spark.sessionState.conf`. Thus, `defaultCatalogOwnedFeatureEnabledKey`
// will remain unchanged for any concurrent operations that use the same SparkSession.
val defaultCatalogOwnedFeatureEnabledKey =
TableFeatureProtocolUtils.defaultPropertyKey(CatalogOwnedTableFeature)
// Isolate the spark conf to be used in the subsequent [[DeltaConfigs.mergeGlobalConfigs]]
// by cloning the existing configuration.
// Note: [[SQLConf.clone]] is already atomic so no extra synchronization is needed.
val clonedConf = spark.sessionState.conf.clone()
// Unset default CC conf on the cloned spark conf.
clonedConf.unsetConf(defaultCatalogOwnedFeatureEnabledKey)
clonedConf
} else {
spark.sessionState.conf
}
metadata.copy(configuration = DeltaConfigs.mergeGlobalConfigs(
conf, metadata.configuration))
}
Expand Down Expand Up @@ -806,42 +832,12 @@ trait OptimisticTransactionImpl extends TransactionHelper
CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS
var newConfs: Map[String, String] = newConfsWithoutCC ++ existingCCConfs ++
existingUCTableIdConf

val isCatalogOwnedEnabledBeforeReplace = snapshot.protocol
.readerAndWriterFeatureNames.contains(CatalogOwnedTableFeature.name)
if (!isCatalogOwnedEnabledBeforeReplace) {
// Ignore the [[CatalogOwnedTableFeature]] if we are replacing an existing normal
// table *without* CatalogOwned enabled.
// This removes [[CatalogOwnedTableFeature]] that may have been added as a result of
// the CatalogOwned spark configuration that enables it by default.
// Users are *NOT* allowed to create a Catalog-Owned table with REPLACE TABLE
// so it's fine to filter it out here.
newProtocol = newProtocol.map(CatalogOwnedTableUtils.filterOutCatalogOwnedTableFeature)

val isICTEnabledBeforeReplace = existingICTConfs.nonEmpty ||
// To prevent any potential protocol downgrade issue we check the existing
// protocol as well.
snapshot.protocol.readerAndWriterFeatureNames
.contains(InCommitTimestampTableFeature.name)
// Note that we only need to get explicit ICT configurations from `newConfs` here,
// because all the default spark configurations should have been merged in the prior
// `updateMetadataForNewTable` call.
val isEnablingICTDuringReplace =
CoordinatedCommitsUtils.getExplicitICTConfigurations(newConfs).nonEmpty
if (!isICTEnabledBeforeReplace && !isEnablingICTDuringReplace) {
// If existing table does *not* have ICT enabled, and we are *not* trying
// to enable ICT manually through explicit overrides, then we should
// filter any unintended [[InCommitTimestampTableFeature]] out here.
newProtocol = newProtocol.map { p =>
p.copy(writerFeatures = p.writerFeatures.map(
_.filterNot(_ == InCommitTimestampTableFeature.name)))
}
}
}
// We also need to retain the existing ICT dependency configurations, but only when the
// existing table does have Coordinated Commits configurations or Catalog-Owned enabled.
// Otherwise, we treat the ICT configurations the same as any other configurations,
// by merging them from the default.
val isCatalogOwnedEnabledBeforeReplace = snapshot.protocol
.readerAndWriterFeatureNames.contains(CatalogOwnedTableFeature.name)
if (existingCCConfs.nonEmpty || isCatalogOwnedEnabledBeforeReplace) {
val newConfsWithoutICT = newConfs -- CoordinatedCommitsUtils.ICT_TABLE_PROPERTY_KEYS
newConfs = newConfsWithoutICT ++ existingICTConfs
Expand Down
Loading