Skip to content

Commit f6370fe

Browse files
committed
work
1 parent 23f9f2f commit f6370fe

File tree

8 files changed

+247
-2
lines changed

8 files changed

+247
-2
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import io.delta.kernel.internal.fs.Path;
3939
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
4040
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
41+
import io.delta.kernel.internal.tablefeatures.TableFeatureSupport;
42+
import io.delta.kernel.internal.tablefeatures.TableFeatures;
4143
import io.delta.kernel.statistics.DataFileStatistics;
4244
import io.delta.kernel.types.StructType;
4345
import io.delta.kernel.utils.*;
@@ -169,6 +171,11 @@ static CloseableIterator<FilteredColumnarBatch> transformLogicalData(
169171
boolean isIcebergCompatEnabled =
170172
isIcebergCompatV2Enabled(transactionState) || isIcebergCompatV3Enabled(transactionState);
171173
blockIfColumnMappingEnabled(transactionState);
174+
// We recognize the AllowColumnDefaults feature for Iceberg v3 but do not support writing with it yet
175+
if (TableFeatureSupport.supports(transactionState, TableFeatures.ALLOW_COLUMN_DEFAULTS_W_FEATURE)) {
176+
throw new UnsupportedOperationException(
177+
"Writing with Column Default values is not supported yet.");
178+
}
172179

173180
// TODO: set the correct schema once writing into column mapping enabled table is supported.
174181
String tablePath = getTablePath(transactionState);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import io.delta.kernel.exceptions.TableNotFoundException;
3636
import io.delta.kernel.expressions.Column;
3737
import io.delta.kernel.internal.actions.*;
38+
import io.delta.kernel.internal.clustering.ClusteringUtils;
39+
import io.delta.kernel.internal.columndefaults.ColumnDefaults;
3840
import io.delta.kernel.internal.icebergcompat.IcebergCompatV2MetadataValidatorAndUpdater;
3941
import io.delta.kernel.internal.icebergcompat.IcebergCompatV3MetadataValidatorAndUpdater;
4042
import io.delta.kernel.internal.icebergcompat.IcebergUniversalFormatMetadataValidatorAndUpdater;
@@ -629,6 +631,7 @@ private Optional<Metadata> validateMetadataChangeAndUpdateMetadata(
629631
IcebergCompatV3MetadataValidatorAndUpdater.validateIcebergCompatV3Change(
630632
oldMetadata.getConfiguration(), newMetadata.getConfiguration(), isCreateOrReplace);
631633
IcebergUniversalFormatMetadataValidatorAndUpdater.validate(newMetadata);
634+
ColumnDefaults.validateMetadataUpdate(oldMetadata, newMetadata);
632635
Optional<Metadata> updatedMetadata = Optional.empty();
633636
// Validate the conditions for schema evolution and the updated schema if applicable
634637
if (schema.isPresent() && !isCreateOrReplace) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (2023) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.columndefaults;
17+
18+
import io.delta.kernel.internal.actions.Metadata;
19+
import io.delta.kernel.internal.util.SchemaUtils;
20+
import io.delta.kernel.types.StructField;
21+
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
/**
27+
* Utilities class for TableFeature "allowColumnDefaults".
28+
* NOTE: As of Aug 2025, kernel only supports reading Delta tables with the table feature,
29+
* or make metadata change to the table. Writing actual data to the table, or modify
30+
* the default values is not allowed.
31+
*/
32+
public class ColumnDefaults {
33+
34+
public static String DEFAULT_VALUE_METADATA_KEY = "CURRENT_DEFAULT";
35+
36+
static class ColumnDefaultsCollector {
37+
Map<String, String> collected = new HashMap<>();
38+
39+
void collect(List<String> paths, StructField field) {
40+
if (field.getMetadata().contains(DEFAULT_VALUE_METADATA_KEY)) {
41+
collected.put(
42+
SchemaUtils.concatWithDot(paths),
43+
field.getMetadata().getString(DEFAULT_VALUE_METADATA_KEY));
44+
}
45+
}
46+
}
47+
/**
48+
* Block changes to column defaults stored in metadata
49+
*/
50+
public static void validateMetadataUpdate(Metadata oldMetadata, Metadata newMetadata) {
51+
ColumnDefaultsCollector oldDefaultsCollector = new ColumnDefaultsCollector();
52+
SchemaUtils.traverse(oldMetadata.getSchema(), oldDefaultsCollector::collect);
53+
ColumnDefaultsCollector newDefaultsCollector = new ColumnDefaultsCollector();
54+
SchemaUtils.traverse(oldMetadata.getSchema(), newDefaultsCollector::collect);
55+
56+
if (! oldDefaultsCollector.collected.equals(newDefaultsCollector.collected)) {
57+
throw new UnsupportedOperationException("Kernel does not support changing column defaults");
58+
}
59+
60+
}
61+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.tablefeatures;
17+
18+
import io.delta.kernel.data.Row;
19+
import io.delta.kernel.internal.data.TransactionStateRow;
20+
21+
public class TableFeatureSupport {
22+
/** The string constant "supported" for uses in table properties. */
23+
public static String FEATURE_PROP_SUPPORTED = "supported";
24+
25+
public static boolean supports(Row transactionState, TableFeature feature) {
26+
return FEATURE_PROP_SUPPORTED.equals(
27+
TransactionStateRow.getConfiguration(transactionState).getOrDefault(
28+
TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX + feature.featureName(),
29+
"unknown"));
30+
}
31+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,14 @@ private static class VacuumProtocolCheckTableFeature extends TableFeature.Reader
454454
}
455455
}
456456

457+
public static final TableFeature ALLOW_COLUMN_DEFAULTS_W_FEATURE = new AllowColumnDefaultsTableFeature();
458+
459+
private static class AllowColumnDefaultsTableFeature extends TableFeature.WriterFeature {
460+
AllowColumnDefaultsTableFeature() {
461+
super("allowColumnDefaults", /* minWriterVersion = */ 7);
462+
}
463+
}
464+
457465
public static final TableFeature ICEBERG_WRITER_COMPAT_V1 = new IcebergWriterCompatV1();
458466

459467
private static class IcebergWriterCompatV1 extends TableFeature.WriterFeature
@@ -506,6 +514,7 @@ public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata me
506514
public static final List<TableFeature> TABLE_FEATURES =
507515
Collections.unmodifiableList(
508516
Arrays.asList(
517+
ALLOW_COLUMN_DEFAULTS_W_FEATURE,
509518
APPEND_ONLY_W_FEATURE,
510519
CATALOG_MANAGED_R_W_FEATURE_PREVIEW,
511520
CHECKPOINT_V2_RW_FEATURE,

kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/SchemaUtils.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.delta.kernel.internal.types.TypeWideningChecker;
3232
import io.delta.kernel.types.*;
3333
import java.util.*;
34+
import java.util.function.BiConsumer;
3435
import java.util.stream.Collectors;
3536

3637
/**
@@ -330,6 +331,34 @@ public static String concatWithDot(List<String> columnPath) {
330331
return columnPath.stream().map(SchemaUtils::escapeDots).collect(Collectors.joining("."));
331332
}
332333

334+
/**
335+
* Traverse a schema
336+
* @param schema root of the schema to be traversed
337+
* @param callback callback on each StructField encountered
338+
*/
339+
public static void traverse(StructType schema, BiConsumer<List<String>, StructField> callback) {
340+
traverse(new ArrayList<>(), schema.fields(), callback);
341+
}
342+
343+
private static void traverse(List<String> paths, List<StructField> fields, BiConsumer<List<String>, StructField> callback) {
344+
fields.stream().forEach( field -> {
345+
if (field.getDataType() instanceof StructType) {
346+
StructType structType = (StructType) field.getDataType();
347+
paths.add(field.getName());
348+
traverse(paths, structType.fields(), callback);
349+
paths.remove(paths.size() - 1);
350+
} else if (field.getDataType() instanceof ArrayType) {
351+
ArrayType arrayType = (ArrayType) field.getDataType();
352+
traverse(paths, Collections.singletonList(arrayType.getElementField()), callback);
353+
} else if (field.getDataType() instanceof MapType) {
354+
MapType mapType = (MapType) field.getDataType();
355+
traverse(paths, Arrays.asList(mapType.getKeyField(), mapType.getValueField()), callback);
356+
} else {
357+
callback.accept(paths, field);
358+
}
359+
});
360+
}
361+
333362
/////////////////////////////////////////////////////////////////////////////////////////////////
334363
/// Private methods ///
335364
/////////////////////////////////////////////////////////////////////////////////////////////////
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.internal.columndefaults
18+
19+
import io.delta.kernel.data.{ArrayValue, MapValue}
20+
import io.delta.kernel.internal.actions._
21+
import io.delta.kernel.test.ActionUtils
22+
import io.delta.kernel.types._
23+
import org.scalatest.funsuite.AnyFunSuite
24+
25+
import java.util.Optional
26+
27+
class ColumnDefaultsSuite extends AnyFunSuite with ActionUtils {
28+
29+
test("validateMetadataUpdate") {
30+
def metadataForDefault(value: String): FieldMetadata =
31+
FieldMetadata.builder().putString("CURRENT_DEFAULT", value).build()
32+
33+
val oldSchema = new StructType()
34+
.add("id", IntegerType)
35+
.add("name", StringType, metadataForDefault("tom"))
36+
.add("childStruct", new StructType()
37+
.add("childId", IntegerType, metadataForDefault("100"))
38+
.add("grandChildList", new ArrayType(
39+
new StructType().add("nestedId", IntegerType, metadataForDefault("120")), false)
40+
))
41+
.add("grandChildMap", new MapType(
42+
new StructType().add("mapKeyId", IntegerType, metadataForDefault("220")),
43+
new StructType().add("mapValueId", IntegerType, metadataForDefault("330")),
44+
false
45+
))
46+
.add("childList", new ArrayType(
47+
new StructType().add("clid", IntegerType, metadataForDefault("300")), false)
48+
)
49+
50+
ColumnDefaults.validateMetadataUpdate(testMetadata(oldSchema), testMetadata(oldSchema))
51+
52+
val changedSchema1 = new StructType().add("id", IntegerType)
53+
val changedSchema2 = new StructType()
54+
.add("id", IntegerType)
55+
.add("name", StringType, metadataForDefault("tom"))
56+
val changedSchema3 = new StructType()
57+
.add("id", IntegerType)
58+
.add("name", StringType, metadataForDefault("tom"))
59+
.add("childStruct", new StructType()
60+
.add("childId", IntegerType, metadataForDefault("100"))
61+
.add("grandChildList", new ArrayType(
62+
new StructType().add("nestedId", IntegerType, metadataForDefault("120")), false)
63+
))
64+
.add("grandChildMap", new MapType(
65+
new StructType().add("mapKeyId", IntegerType, metadataForDefault("170")),
66+
new StructType().add("mapValueId", IntegerType, metadataForDefault("330")),
67+
false
68+
))
69+
.add("childList", new ArrayType(
70+
new StructType().add("clid", IntegerType, metadataForDefault("300")), false)
71+
)
72+
val changedSchema4 = new StructType()
73+
.add("id", IntegerType)
74+
.add("name", StringType, metadataForDefault("tom"))
75+
.add("childStruct", new StructType()
76+
.add("childId", IntegerType, metadataForDefault("100"))
77+
.add("grandChildList", new ArrayType(
78+
new StructType().add("nestedId", IntegerType, metadataForDefault("120")), false)
79+
))
80+
.add("grandChildMap", new MapType(
81+
new StructType().add("mapKeyId", IntegerType, metadataForDefault("220")),
82+
new StructType().add("mapValueId", IntegerType, metadataForDefault("330")),
83+
false
84+
))
85+
.add("childList", new ArrayType(
86+
new StructType().add("clid", IntegerType, metadataForDefault("310")), false)
87+
)
88+
Seq(changedSchema1, changedSchema2, changedSchema3, changedSchema4).foreach( newSchema =>
89+
intercept[UnsupportedOperationException] {
90+
ColumnDefaults.validateMetadataUpdate(testMetadata(oldSchema), testMetadata(newSchema))
91+
}
92+
)
93+
}
94+
}

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class TableFeaturesSuite extends AnyFunSuite {
5353
"variantShredding-preview")
5454

5555
val writerOnlyFeatures = Seq(
56+
"allowColumnDefaults",
5657
"appendOnly",
5758
"invariants",
5859
"checkConstraints",
@@ -196,7 +197,8 @@ class TableFeaturesSuite extends AnyFunSuite {
196197
}
197198
})
198199

199-
Seq("domainMetadata", "vacuumProtocolCheck", "clustering").foreach { feature =>
200+
Seq("domainMetadata", "vacuumProtocolCheck", "clustering", "allowColumnDefaults")
201+
.foreach { feature =>
200202
test(s"doesn't support auto enable by metadata: $feature") {
201203
val tableFeature = TableFeatures.getTableFeature(feature)
202204
assert(!tableFeature.isInstanceOf[FeatureAutoEnabledByMetadata])
@@ -252,6 +254,7 @@ class TableFeaturesSuite extends AnyFunSuite {
252254
// these features are enabled
253255
val expected = Seq(
254256
"columnMapping",
257+
"allowColumnDefaults",
255258
"v2Checkpoint",
256259
"deletionVectors",
257260
"vacuumProtocolCheck",
@@ -320,7 +323,9 @@ class TableFeaturesSuite extends AnyFunSuite {
320323
"timestampNtz",
321324
"v2Checkpoint",
322325
"vacuumProtocolCheck",
323-
"columnMapping").foreach { feature =>
326+
"columnMapping",
327+
"allowColumnDefaults"
328+
).foreach { feature =>
324329
test(s"validateKernelCanReadTheTable: protocol 3 with $feature") {
325330
val protocol = new Protocol(3, 1, singleton(feature), Set().asJava)
326331
validateKernelCanReadTheTable(protocol, "/test/table")
@@ -500,6 +505,12 @@ class TableFeaturesSuite extends AnyFunSuite {
500505
new Protocol(2, 7, Set().asJava, singleton("columnMapping")),
501506
testMetadata(tblProps = Map("delta.columnMapping.mode" -> "id")))
502507

508+
checkWriteSupported(
509+
"validateKernelCanWriteToTable: protocol 7 with allowColumnDefaults, " +
510+
"metadata contains allowColumnDefaults",
511+
new Protocol(2, 7, Set().asJava, singleton("allowColumnDefaults")),
512+
testMetadata(tblProps = Map("delta.feature.allowColumnDefaults" -> "supported")))
513+
503514
checkWriteSupported(
504515
"validateKernelCanWriteToTable: protocol 7 with identityColumns, " +
505516
"schema doesn't contains identity columns",

0 commit comments

Comments
 (0)