diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index a4bf0745f57..d1551197ed9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -375,7 +375,33 @@ trait OptimisticTransactionImpl extends TransactionHelper /** Creates new metadata with global Delta configuration defaults. */ private def withGlobalConfigDefaults(metadata: Metadata): Metadata = { - val conf = spark.sessionState.conf + val isActiveReplaceCommand = isCreatingNewTable && readVersion != -1 + val conf = if (isActiveReplaceCommand && + CatalogOwnedTableUtils.defaultCatalogOwnedEnabled(spark)) { + // Unset default CatalogOwned enablement iff: + // 0. `isCreatingNewTable` indicates that this either is a REPLACE or CREATE command. + // 1. `readVersion != 1` indicates the table already exists. + // - 0) and 1) suggest that this is an active REPLACE command. + // 2. Default CC enablement is set in the spark conf. + // This prevents any unintended modifications to the `newProtocol`. + // E.g., [[CatalogOwnedTableFeature]] and its dependent features + // [[InCommitTimestampTableFeature]] & [[VacuumProtocolCheckTableFeature]]. + // + // Note that this does *not* affect global spark conf state as we are modifying + // the copy of `spark.sessionState.conf`. Thus, `defaultCatalogOwnedFeatureEnabledKey` + // will remain unchanged for any concurrent operations that use the same SparkSession. + val defaultCatalogOwnedFeatureEnabledKey = + TableFeatureProtocolUtils.defaultPropertyKey(CatalogOwnedTableFeature) + // Isolate the spark conf to be used in the subsequent [[DeltaConfigs.mergeGlobalConfigs]] + // by cloning the existing configuration. + // Note: [[SQLConf.clone]] is already atomic so no extra synchronization is needed. + val clonedConf = spark.sessionState.conf.clone() + // Unset default CC conf on the cloned spark conf. + clonedConf.unsetConf(defaultCatalogOwnedFeatureEnabledKey) + clonedConf + } else { + spark.sessionState.conf + } metadata.copy(configuration = DeltaConfigs.mergeGlobalConfigs( conf, metadata.configuration)) } @@ -806,42 +832,12 @@ trait OptimisticTransactionImpl extends TransactionHelper CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS var newConfs: Map[String, String] = newConfsWithoutCC ++ existingCCConfs ++ existingUCTableIdConf - - val isCatalogOwnedEnabledBeforeReplace = snapshot.protocol - .readerAndWriterFeatureNames.contains(CatalogOwnedTableFeature.name) - if (!isCatalogOwnedEnabledBeforeReplace) { - // Ignore the [[CatalogOwnedTableFeature]] if we are replacing an existing normal - // table *without* CatalogOwned enabled. - // This removes [[CatalogOwnedTableFeature]] that may have been added as a result of - // the CatalogOwned spark configuration that enables it by default. - // Users are *NOT* allowed to create a Catalog-Owned table with REPLACE TABLE - // so it's fine to filter it out here. - newProtocol = newProtocol.map(CatalogOwnedTableUtils.filterOutCatalogOwnedTableFeature) - - val isICTEnabledBeforeReplace = existingICTConfs.nonEmpty || - // To prevent any potential protocol downgrade issue we check the existing - // protocol as well. - snapshot.protocol.readerAndWriterFeatureNames - .contains(InCommitTimestampTableFeature.name) - // Note that we only need to get explicit ICT configurations from `newConfs` here, - // because all the default spark configurations should have been merged in the prior - // `updateMetadataForNewTable` call. - val isEnablingICTDuringReplace = - CoordinatedCommitsUtils.getExplicitICTConfigurations(newConfs).nonEmpty - if (!isICTEnabledBeforeReplace && !isEnablingICTDuringReplace) { - // If existing table does *not* have ICT enabled, and we are *not* trying - // to enable ICT manually through explicit overrides, then we should - // filter any unintended [[InCommitTimestampTableFeature]] out here. - newProtocol = newProtocol.map { p => - p.copy(writerFeatures = p.writerFeatures.map( - _.filterNot(_ == InCommitTimestampTableFeature.name))) - } - } - } // We also need to retain the existing ICT dependency configurations, but only when the // existing table does have Coordinated Commits configurations or Catalog-Owned enabled. // Otherwise, we treat the ICT configurations the same as any other configurations, // by merging them from the default. + val isCatalogOwnedEnabledBeforeReplace = snapshot.protocol + .readerAndWriterFeatureNames.contains(CatalogOwnedTableFeature.name) if (existingCCConfs.nonEmpty || isCatalogOwnedEnabledBeforeReplace) { val newConfsWithoutICT = newConfs -- CoordinatedCommitsUtils.ICT_TABLE_PROPERTY_KEYS newConfs = newConfsWithoutICT ++ existingICTConfs