Skip to content

Commit 522de0f

Browse files
authored
Flink: Migrate Flink TableSchema to Schema/ResolvedSchema (apache#13072)
1 parent bcb6880 commit 522de0f

38 files changed

+1478
-568
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ This product includes code from Apache Flink.
331331
* Parameterized test at class level logic in ParameterizedTestExtension.java
332332
* Parameter provider annotation for parameterized tests in Parameters.java
333333
* Parameter field annotation for parameterized tests in Parameter.java
334+
* Primary key validation logic in FlinkSchemaUtil.java
334335

335336
Copyright: 1999-2022 The Apache Software Foundation.
336337
Home page: https://flink.apache.org/

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.flink.table.catalog.CatalogTable;
3939
import org.apache.flink.table.catalog.ObjectPath;
4040
import org.apache.flink.table.catalog.ResolvedCatalogTable;
41+
import org.apache.flink.table.catalog.ResolvedSchema;
4142
import org.apache.flink.table.catalog.TableChange;
4243
import org.apache.flink.table.catalog.exceptions.CatalogException;
4344
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
@@ -51,7 +52,6 @@
5152
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
5253
import org.apache.flink.table.expressions.Expression;
5354
import org.apache.flink.table.factories.Factory;
54-
import org.apache.flink.table.legacy.api.TableSchema;
5555
import org.apache.flink.util.StringUtils;
5656
import org.apache.iceberg.CachingCatalog;
5757
import org.apache.iceberg.DataFile;
@@ -435,7 +435,7 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea
435435
validateFlinkTable(table);
436436

437437
Schema icebergSchema = FlinkSchemaUtil.convert(table.getResolvedSchema());
438-
PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);
438+
PartitionSpec spec = toPartitionSpec(table.getPartitionKeys(), icebergSchema);
439439
ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
440440
String location = null;
441441
for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
@@ -467,22 +467,7 @@ private boolean isReservedProperty(String prop) {
467467
}
468468

469469
private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) {
470-
TableSchema ts1 = ct1.getSchema();
471-
TableSchema ts2 = ct2.getSchema();
472-
boolean equalsPrimary = false;
473-
474-
if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
475-
equalsPrimary =
476-
Objects.equals(ts1.getPrimaryKey().get().getType(), ts2.getPrimaryKey().get().getType())
477-
&& Objects.equals(
478-
ts1.getPrimaryKey().get().getColumns(), ts2.getPrimaryKey().get().getColumns());
479-
} else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
480-
equalsPrimary = true;
481-
}
482-
483-
if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
484-
&& Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
485-
&& equalsPrimary)) {
470+
if (!Objects.equals(ct1.getUnresolvedSchema(), ct2.getUnresolvedSchema())) {
486471
throw new UnsupportedOperationException(
487472
"Altering schema is not supported in the old alterTable API. "
488473
+ "To alter schema, use the other alterTable API and provide a list of TableChange's.");
@@ -632,9 +617,9 @@ private static void validateFlinkTable(CatalogBaseTable table) {
632617
Preconditions.checkArgument(
633618
table instanceof CatalogTable, "The Table should be a CatalogTable.");
634619

635-
TableSchema schema = table.getSchema();
620+
org.apache.flink.table.api.Schema schema = table.getUnresolvedSchema();
636621
schema
637-
.getTableColumns()
622+
.getColumns()
638623
.forEach(
639624
column -> {
640625
if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
@@ -671,16 +656,17 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
671656
}
672657

673658
static CatalogTable toCatalogTableWithProps(Table table, Map<String, String> props) {
674-
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
659+
ResolvedSchema resolvedSchema = FlinkSchemaUtil.toResolvedSchema(table.schema());
675660
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
676661

677662
// NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer
678-
// may use
679-
// CatalogTableImpl to copy a new catalog table.
663+
// may use DefaultCatalogTable to copy a new catalog table.
680664
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
681-
// Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
682665
return CatalogTable.newBuilder()
683-
.schema(schema.toSchema())
666+
.schema(
667+
org.apache.flink.table.api.Schema.newBuilder()
668+
.fromResolvedSchema(resolvedSchema)
669+
.build())
684670
.partitionKeys(partitionKeys)
685671
.options(props)
686672
.build();

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,21 @@
2121
import java.util.Collections;
2222
import java.util.Map;
2323
import java.util.Set;
24+
import java.util.stream.Collectors;
2425
import org.apache.flink.configuration.ConfigOption;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
28+
import org.apache.flink.table.catalog.Column;
2729
import org.apache.flink.table.catalog.ObjectIdentifier;
2830
import org.apache.flink.table.catalog.ObjectPath;
2931
import org.apache.flink.table.catalog.ResolvedCatalogTable;
32+
import org.apache.flink.table.catalog.ResolvedSchema;
3033
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
3134
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
3235
import org.apache.flink.table.connector.sink.DynamicTableSink;
3336
import org.apache.flink.table.connector.source.DynamicTableSource;
3437
import org.apache.flink.table.factories.DynamicTableSinkFactory;
3538
import org.apache.flink.table.factories.DynamicTableSourceFactory;
36-
import org.apache.flink.table.legacy.api.TableSchema;
37-
import org.apache.flink.table.utils.TableSchemaUtils;
3839
import org.apache.iceberg.catalog.TableIdentifier;
3940
import org.apache.iceberg.exceptions.AlreadyExistsException;
4041
import org.apache.iceberg.flink.source.IcebergTableSource;
@@ -60,7 +61,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
6061
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
6162
ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
6263
Map<String, String> tableProps = resolvedCatalogTable.getOptions();
63-
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(resolvedCatalogTable.getSchema());
64+
ResolvedSchema resolvedSchema =
65+
ResolvedSchema.of(
66+
resolvedCatalogTable.getResolvedSchema().getColumns().stream()
67+
.filter(Column::isPhysical)
68+
.collect(Collectors.toList()));
6469

6570
TableLoader tableLoader;
6671
if (catalog != null) {
@@ -74,15 +79,20 @@ public DynamicTableSource createDynamicTableSource(Context context) {
7479
objectIdentifier.getObjectName());
7580
}
7681

77-
return new IcebergTableSource(tableLoader, tableSchema, tableProps, context.getConfiguration());
82+
return new IcebergTableSource(
83+
tableLoader, resolvedSchema, tableProps, context.getConfiguration());
7884
}
7985

8086
@Override
8187
public DynamicTableSink createDynamicTableSink(Context context) {
8288
ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
8389
ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable();
8490
Map<String, String> writeProps = resolvedCatalogTable.getOptions();
85-
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(resolvedCatalogTable.getSchema());
91+
ResolvedSchema resolvedSchema =
92+
ResolvedSchema.of(
93+
resolvedCatalogTable.getResolvedSchema().getColumns().stream()
94+
.filter(Column::isPhysical)
95+
.collect(Collectors.toList()));
8696

8797
TableLoader tableLoader;
8898
if (catalog != null) {
@@ -96,7 +106,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
96106
objectIdentifier.getObjectName());
97107
}
98108

99-
return new IcebergTableSink(tableLoader, tableSchema, context.getConfiguration(), writeProps);
109+
return new IcebergTableSink(
110+
tableLoader, resolvedSchema, context.getConfiguration(), writeProps);
100111
}
101112

102113
@Override

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java

Lines changed: 152 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,18 @@
1818
*/
1919
package org.apache.iceberg.flink;
2020

21+
import java.util.Collections;
2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.Set;
25+
import java.util.UUID;
26+
import java.util.function.Function;
27+
import java.util.stream.Collectors;
2328
import org.apache.flink.table.api.DataTypes;
29+
import org.apache.flink.table.api.ValidationException;
2430
import org.apache.flink.table.catalog.Column;
2531
import org.apache.flink.table.catalog.ResolvedSchema;
32+
import org.apache.flink.table.catalog.UniqueConstraint;
2633
import org.apache.flink.table.legacy.api.TableSchema;
2734
import org.apache.flink.table.types.logical.LogicalType;
2835
import org.apache.flink.table.types.logical.RowType;
@@ -59,7 +66,7 @@ public class FlinkSchemaUtil {
5966
private FlinkSchemaUtil() {}
6067

6168
/**
62-
* @deprecated Use {@link #convert(ResolvedSchema)} instead.
69+
* @deprecated will be removed in 2.0.0; use {@link #convert(ResolvedSchema)} instead.
6370
*/
6471
@Deprecated
6572
public static Schema convert(TableSchema schema) {
@@ -102,11 +109,10 @@ public static Schema convert(ResolvedSchema flinkSchema) {
102109
RowType root = (RowType) schemaType;
103110
Type converted = root.accept(new FlinkTypeToType(root));
104111
Schema icebergSchema = new Schema(converted.asStructType().fields());
105-
if (flinkSchema.getPrimaryKey().isPresent()) {
106-
return freshIdentifierFieldIds(icebergSchema, flinkSchema.getPrimaryKey().get().getColumns());
107-
} else {
108-
return icebergSchema;
109-
}
112+
return flinkSchema
113+
.getPrimaryKey()
114+
.map(pk -> freshIdentifierFieldIds(icebergSchema, pk.getColumns()))
115+
.orElse(icebergSchema);
110116
}
111117

112118
private static Schema freshIdentifierFieldIds(Schema icebergSchema, List<String> primaryKeys) {
@@ -137,7 +143,10 @@ private static Schema freshIdentifierFieldIds(Schema icebergSchema, List<String>
137143
* @param flinkSchema a Flink TableSchema
138144
* @return the equivalent Schema
139145
* @throws IllegalArgumentException if the type cannot be converted or there are missing ids
146+
* @deprecated since 1.10.0, will be removed in 2.0.0. Use {@link #convert(Schema,
147+
* ResolvedSchema)} instead.
140148
*/
149+
@Deprecated
141150
public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {
142151
// convert to a type with fresh ids
143152
Types.StructType struct = convert(flinkSchema).asStruct();
@@ -155,6 +164,35 @@ public static Schema convert(Schema baseSchema, TableSchema flinkSchema) {
155164
}
156165
}
157166

167+
/**
168+
* Convert a Flink {@link ResolvedSchema} to a {@link Schema} based on the given schema.
169+
*
170+
* <p>This conversion does not assign new ids; it uses ids from the base schema.
171+
*
172+
* <p>Data types, field order, and nullability will match the Flink type. This conversion may
173+
* return a schema that is not compatible with base schema.
174+
*
175+
* @param baseSchema a Schema on which conversion is based
176+
* @param flinkSchema a Flink ResolvedSchema
177+
* @return the equivalent Schema
178+
* @throws IllegalArgumentException if the type cannot be converted or there are missing ids
179+
*/
180+
public static Schema convert(Schema baseSchema, ResolvedSchema flinkSchema) {
181+
// convert to a type with fresh ids
182+
Types.StructType struct = convert(flinkSchema).asStruct();
183+
// reassign ids to match the base schema
184+
Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema);
185+
// reassign doc to match the base schema
186+
schema = TypeUtil.reassignDoc(schema, baseSchema);
187+
188+
// fix types that can't be represented in Flink (UUID)
189+
Schema fixedSchema = FlinkFixupTypes.fixup(schema, baseSchema);
190+
return flinkSchema
191+
.getPrimaryKey()
192+
.map(pk -> freshIdentifierFieldIds(fixedSchema, pk.getColumns()))
193+
.orElse(fixedSchema);
194+
}
195+
158196
/**
159197
* Convert a {@link Schema} to a {@link RowType Flink type}.
160198
*
@@ -192,7 +230,10 @@ public static Type convert(LogicalType flinkType) {
192230
*
193231
* @param rowType a RowType
194232
* @return Flink TableSchema
233+
* @deprecated since 1.10.0, will be removed in 2.0.0. Use {@link #toResolvedSchema(RowType)}
234+
* instead
195235
*/
236+
@Deprecated
196237
public static TableSchema toSchema(RowType rowType) {
197238
TableSchema.Builder builder = TableSchema.builder();
198239
for (RowType.RowField field : rowType.getFields()) {
@@ -201,12 +242,31 @@ public static TableSchema toSchema(RowType rowType) {
201242
return builder.build();
202243
}
203244

245+
/**
246+
* Convert a {@link RowType} to a {@link ResolvedSchema}.
247+
*
248+
* @param rowType a RowType
249+
* @return Flink ResolvedSchema
250+
*/
251+
public static ResolvedSchema toResolvedSchema(RowType rowType) {
252+
List<Column> columns = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
253+
for (RowType.RowField field : rowType.getFields()) {
254+
columns.add(
255+
Column.physical(field.getName(), TypeConversions.fromLogicalToDataType(field.getType())));
256+
}
257+
258+
return ResolvedSchema.of(columns);
259+
}
260+
204261
/**
205262
* Convert a {@link Schema} to a {@link TableSchema}.
206263
*
207264
* @param schema iceberg schema to convert.
208265
* @return Flink TableSchema.
266+
* @deprecated since 1.10.0, will be removed in 2.0.0. Use {@link #toResolvedSchema(Schema)}
267+
* instead
209268
*/
269+
@Deprecated
210270
public static TableSchema toSchema(Schema schema) {
211271
TableSchema.Builder builder = TableSchema.builder();
212272

@@ -231,4 +291,90 @@ public static TableSchema toSchema(Schema schema) {
231291

232292
return builder.build();
233293
}
294+
295+
/**
296+
* Convert a {@link Schema} to a {@link ResolvedSchema}.
297+
*
298+
* @param schema iceberg schema to convert.
299+
* @return Flink ResolvedSchema.
300+
*/
301+
public static ResolvedSchema toResolvedSchema(Schema schema) {
302+
RowType rowType = convert(schema);
303+
List<Column> columns = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
304+
305+
// Add columns.
306+
for (RowType.RowField field : rowType.getFields()) {
307+
columns.add(
308+
Column.physical(field.getName(), TypeConversions.fromLogicalToDataType(field.getType())));
309+
}
310+
311+
// Add primary key.
312+
Set<Integer> identifierFieldIds = schema.identifierFieldIds();
313+
UniqueConstraint uniqueConstraint = null;
314+
if (!identifierFieldIds.isEmpty()) {
315+
List<String> primaryKeyColumns =
316+
Lists.newArrayListWithExpectedSize(identifierFieldIds.size());
317+
for (Integer identifierFieldId : identifierFieldIds) {
318+
String columnName = schema.findColumnName(identifierFieldId);
319+
Preconditions.checkNotNull(
320+
columnName, "Cannot find field with id %s in schema %s", identifierFieldId, schema);
321+
322+
primaryKeyColumns.add(columnName);
323+
}
324+
325+
uniqueConstraint =
326+
UniqueConstraint.primaryKey(UUID.randomUUID().toString(), primaryKeyColumns);
327+
328+
validatePrimaryKey(uniqueConstraint, columns);
329+
}
330+
331+
return new ResolvedSchema(columns, Collections.emptyList(), uniqueConstraint);
332+
}
333+
334+
/**
335+
* Copied from
336+
* org.apache.flink.table.catalog.DefaultSchemaResolver#validatePrimaryKey(org.apache.flink.table.catalog.UniqueConstraint,
337+
* java.util.List)
338+
*/
339+
private static void validatePrimaryKey(UniqueConstraint primaryKey, List<Column> columns) {
340+
final Map<String, Column> columnsByNameLookup =
341+
columns.stream().collect(Collectors.toMap(Column::getName, Function.identity()));
342+
343+
final Set<String> duplicateColumns =
344+
primaryKey.getColumns().stream()
345+
.filter(name -> Collections.frequency(primaryKey.getColumns(), name) > 1)
346+
.collect(Collectors.toSet());
347+
348+
if (!duplicateColumns.isEmpty()) {
349+
throw new ValidationException(
350+
String.format(
351+
"Invalid primary key '%s'. A primary key must not contain duplicate columns. Found: %s",
352+
primaryKey.getName(), duplicateColumns));
353+
}
354+
355+
for (String columnName : primaryKey.getColumns()) {
356+
Column column = columnsByNameLookup.get(columnName);
357+
if (column == null) {
358+
throw new ValidationException(
359+
String.format(
360+
"Invalid primary key '%s'. Column '%s' does not exist.",
361+
primaryKey.getName(), columnName));
362+
}
363+
364+
if (!column.isPhysical()) {
365+
throw new ValidationException(
366+
String.format(
367+
"Invalid primary key '%s'. Column '%s' is not a physical column.",
368+
primaryKey.getName(), columnName));
369+
}
370+
371+
final LogicalType columnType = column.getDataType().getLogicalType();
372+
if (columnType.isNullable()) {
373+
throw new ValidationException(
374+
String.format(
375+
"Invalid primary key '%s'. Column '%s' is nullable.",
376+
primaryKey.getName(), columnName));
377+
}
378+
}
379+
}
234380
}

0 commit comments

Comments
 (0)