Skip to content

Commit 76ed376

Browse files
committed
Fix Creating an external table fails when it is given the same options as for the underlying data #809
Signed-off-by: Lucian Neghina <[email protected]>
1 parent 3ffb30d commit 76ed376

File tree

5 files changed

+157
-35
lines changed

5 files changed

+157
-35
lines changed

core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,13 @@ class DeltaCatalog extends DelegatingCatalogExtension
199199
partitions: Array[Transform],
200200
properties: util.Map[String, String]): Table = {
201201
if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
202+
val catalogProperties = DeltaTableProperties(properties)
202203
createDeltaTable(
203204
ident,
204205
schema,
205206
partitions,
206-
properties,
207-
Map.empty,
207+
catalogProperties.properties.asJava,
208+
catalogProperties.options,
208209
sourceQuery = None,
209210
TableCreationModes.Create)
210211
} else {
@@ -350,42 +351,13 @@ class DeltaCatalog extends DelegatingCatalogExtension
350351
private var writeOptions: Map[String, String] = Map.empty
351352

352353
override def commitStagedChanges(): Unit = {
353-
val conf = spark.sessionState.conf
354-
val props = new util.HashMap[String, String]()
355-
// Options passed in through the SQL API will show up both with an "option." prefix and
356-
// without in Spark 3.1, so we need to remove those from the properties
357-
val optionsThroughProperties = properties.asScala.collect {
358-
case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
359-
}.toSet
360-
val sqlWriteOptions = new util.HashMap[String, String]()
361-
properties.asScala.foreach { case (k, v) =>
362-
if (!k.startsWith("option.") && !optionsThroughProperties.contains(k)) {
363-
// Do not add to properties
364-
props.put(k, v)
365-
} else if (optionsThroughProperties.contains(k)) {
366-
sqlWriteOptions.put(k, v)
367-
}
368-
}
369-
if (writeOptions.isEmpty && !sqlWriteOptions.isEmpty) {
370-
writeOptions = sqlWriteOptions.asScala.toMap
371-
}
372-
if (conf.getConf(DeltaSQLConf.DELTA_LEGACY_STORE_WRITER_OPTIONS_AS_PROPS)) {
373-
// Legacy behavior
374-
writeOptions.foreach { case (k, v) => props.put(k, v) }
375-
} else {
376-
writeOptions.foreach { case (k, v) =>
377-
// Continue putting in Delta prefixed options to avoid breaking workloads
378-
if (k.toLowerCase(Locale.ROOT).startsWith("delta.")) {
379-
props.put(k, v)
380-
}
381-
}
382-
}
354+
val catalogProp = DeltaTableProperties(properties, writeOptions)
383355
createDeltaTable(
384356
ident,
385357
schema,
386358
partitions,
387-
props,
388-
writeOptions,
359+
catalogProp.properties.asJava,
360+
catalogProp.options,
389361
asSelectQuery,
390362
operation)
391363
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.catalog
18+
19+
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
21+
22+
import java.util
23+
import java.util.Locale
24+
import scala.collection.JavaConverters._
25+
26+
case class DeltaTableProperties(properties: Map[String, String], options: Map[String, String])
27+
28+
object DeltaTableProperties {
29+
30+
def apply(properties: util.Map[String, String],
31+
deltaOptions: Map[String, String] = Map.empty): DeltaTableProperties = {
32+
val (options, props) = split(properties.asScala.toMap)
33+
val writeOptions = if (deltaOptions.isEmpty && options.nonEmpty) options else deltaOptions
34+
val tableProperties = props ++ deltaProperties(writeOptions)
35+
36+
new DeltaTableProperties(tableProperties, writeOptions)
37+
}
38+
39+
private def deltaProperties(writeOptions: Map[String, String]): Map[String, String] = {
40+
val conf = SparkSession.active.sessionState.conf
41+
42+
if (conf.getConf(DeltaSQLConf.DELTA_LEGACY_STORE_WRITER_OPTIONS_AS_PROPS)) {
43+
// Legacy behavior
44+
writeOptions
45+
} else {
46+
writeOptions.filter { case (k, _) =>
47+
k.toLowerCase(Locale.ROOT).startsWith("delta.") }
48+
}
49+
}
50+
51+
private def split(properties: Map[String, String]): (Map[String, String], Map[String, String]) = {
52+
// Options passed in through the SQL API will show up both with an "option." prefix and
53+
// without in Spark 3.1, so we need to remove those from the properties
54+
val optionsThroughProperties = properties.collect {
55+
case (k, _) if k.startsWith("option.") => k.stripPrefix("option.")
56+
}.toSet
57+
58+
val options = new util.HashMap[String, String]()
59+
val props = new util.HashMap[String, String]()
60+
61+
properties.foreach { case (k, v) =>
62+
if (!k.startsWith("option.") && !optionsThroughProperties.contains(k) && !k.equals("path")) {
63+
// Do not add to properties
64+
props.put(k, v)
65+
} else if (optionsThroughProperties.contains(k)) {
66+
options.put(k, v)
67+
}
68+
}
69+
70+
(options.asScala.toMap, props.asScala.toMap)
71+
}
72+
}

core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.apache.spark.sql.delta.sources
1818

19+
import org.apache.spark.sql.delta.catalog.DeltaTableProperties
20+
1921
import scala.collection.JavaConverters._
2022
import scala.collection.mutable
2123
import scala.util.{Failure, Success, Try}
@@ -145,12 +147,13 @@ class DeltaDataSource
145147
.getOrElse(Nil)
146148

147149
val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path, parameters)
150+
val catalogProp = DeltaTableProperties(parameters.asJava)
148151
WriteIntoDelta(
149152
deltaLog = deltaLog,
150153
mode = mode,
151154
new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
152155
partitionColumns = partitionColumns,
153-
configuration = Map.empty,
156+
configuration = catalogProp.properties,
154157
data = data).run(sqlContext.sparkSession)
155158

156159
deltaLog.createRelation()

core/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,4 +365,47 @@ class DeltaTableBuilderSuite extends QueryTest with SharedSparkSession with Delt
365365
assert(e.getMessage.equals("Database 'parquet' not found"))
366366
}
367367
}
368+
369+
test("create table - different PROPERTIES") {
370+
withTempDir {
371+
tempDir =>
372+
val path = tempDir.getCanonicalPath
373+
val options = Map(
374+
"header"->"true",
375+
"foo"->"bar"
376+
)
377+
spark.range(10)
378+
.write
379+
.format("delta")
380+
.options(options)
381+
.save(path)
382+
383+
val e = intercept[AnalysisException] {
384+
sql(s"""CREATE TABLE test (id LONG)
385+
| USING delta LOCATION '$path'
386+
| TBLPROPERTIES ('foo'='bar')""".stripMargin)
387+
}
388+
assert(e.getMessage.startsWith(
389+
"The specified properties do not match the existing properties"))
390+
}
391+
}
392+
393+
test("create table - same existing properties") {
394+
withTempDir { tempDir =>
395+
val path = tempDir.getCanonicalPath
396+
spark.range(10)
397+
.write
398+
.format("delta")
399+
.option("header", "true")
400+
.save(path)
401+
sql(
402+
s"""CREATE TABLE testTable (id LONG)
403+
| USING delta LOCATION '$path'
404+
| TBLPROPERTIES ('header'='true')""".stripMargin)
405+
406+
val deltaLog = DeltaLog.forTable(spark, path)
407+
val expected = Map("header" -> "true")
408+
assert(deltaLog.snapshot.metadata.configuration === expected)
409+
}
410+
}
368411
}

core/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,4 +1663,36 @@ class DeltaSuite extends QueryTest
16631663
dir2.renameTo(dir1)
16641664
checkAnswer(spark.read.format("delta").load(dir1.getCanonicalPath), spark.range(10).toDF)
16651665
}
1666+
1667+
test("set metadata upon write") {
1668+
withTempDir { tempDir =>
1669+
val path = tempDir.getCanonicalPath
1670+
val options = Map(
1671+
"header"-> "true",
1672+
"foo" -> "bar"
1673+
)
1674+
spark.range(10).write
1675+
.format("delta")
1676+
.options(options)
1677+
.save(path)
1678+
1679+
val deltaLog = DeltaLog.forTable(spark, path)
1680+
assert(deltaLog.snapshot.metadata.configuration === options)
1681+
}
1682+
}
1683+
1684+
test("set metadata upon write using v2") {
1685+
withTempDir { tempDir =>
1686+
val path = tempDir.getCanonicalPath
1687+
spark.range(10)
1688+
.writeTo(s"delta.`$path`")
1689+
.using("delta")
1690+
.tableProperty("header", "true")
1691+
.createOrReplace()
1692+
1693+
val deltaLog = DeltaLog.forTable(spark, path)
1694+
val expected = Map("header" -> "true")
1695+
assert(deltaLog.snapshot.metadata.configuration === expected)
1696+
}
1697+
}
16661698
}

0 commit comments

Comments
 (0)