Skip to content

Commit e381c08

Browse files
authored
[hotfix] Add ALLOW_NON_STRING_TO_STRING to cdc schema change (#5298)
1 parent ef3bec1 commit e381c08

File tree

8 files changed

+57
-37
lines changed

8 files changed

+57
-37
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public static boolean schemaCompatible(
9292
return false;
9393
}
9494
DataType type = paimonSchema.fields().get(idx).type();
95-
if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
95+
if (UpdatedDataFieldsProcessFunction.canConvert(
96+
field.type(), type, TypeMapping.defaultMapping())
9697
!= UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) {
9798
LOG.info(
9899
"Cannot convert field '{}' from source table type '{}' to Paimon type '{}'.",

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TypeMapping.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ public enum TypeMappingMode {
7777
CHAR_TO_STRING,
7878
LONGTEXT_TO_BYTES,
7979
DECIMAL_NO_CHANGE,
80-
BIGINT_UNSIGNED_TO_BIGINT;
80+
BIGINT_UNSIGNED_TO_BIGINT,
81+
ALLOW_NON_STRING_TO_STRING;
8182

8283
private static final Map<String, TypeMappingMode> TYPE_MAPPING_OPTIONS =
8384
Arrays.stream(TypeMappingMode.values())

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/schema/JdbcSchemaUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public static Schema mergeSchema(
122122
DataField dataField = currentFields.get(newField.name());
123123
if (Objects.nonNull(dataField)) {
124124
DataType oldType = dataField.type();
125-
switch (UpdatedDataFieldsProcessFunction.canConvert(oldType, newField.type())) {
125+
switch (UpdatedDataFieldsProcessFunction.canConvert(
126+
oldType, newField.type(), TypeMapping.defaultMapping())) {
126127
case CONVERT:
127128
currentFields.put(newField.name(), newField);
128129
break;

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected void applySchemaChange(
133133
+ " does not exist in table. This is unexpected.");
134134
DataType oldType = schema.fields().get(idx).type();
135135
DataType newType = updateColumnType.newDataType();
136-
switch (canConvert(oldType, newType)) {
136+
switch (canConvert(oldType, newType, typeMapping)) {
137137
case CONVERT:
138138
catalog.alterTable(identifier, schemaChange, false);
139139
break;
@@ -157,7 +157,8 @@ protected void applySchemaChange(
157157
}
158158
}
159159

160-
public static ConvertAction canConvert(DataType oldType, DataType newType) {
160+
public static ConvertAction canConvert(
161+
DataType oldType, DataType newType, TypeMapping typeMapping) {
161162
if (oldType.equalsIgnoreNullable(newType)) {
162163
return ConvertAction.CONVERT;
163164
}
@@ -171,7 +172,9 @@ public static ConvertAction canConvert(DataType oldType, DataType newType) {
171172
}
172173

173174
// object can always be converted to string
174-
if (oldIdx < 0 && newIdx >= 0) {
175+
if ((oldIdx < 0 && newIdx >= 0)
176+
&& typeMapping.containsMode(
177+
TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING)) {
175178
return ConvertAction.CONVERT;
176179
}
177180

@@ -277,8 +280,8 @@ public void close() throws Exception {
277280
}
278281

279282
/**
280-
* Return type of {@link UpdatedDataFieldsProcessFunction#canConvert(DataType, DataType)}. This
281-
* enum indicates the action to perform.
283+
* Return type of {@link UpdatedDataFieldsProcessFunction#canConvert}. This enum indicates the
284+
* action to perform.
282285
*/
283286
public enum ConvertAction {
284287

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.types.DataTypes;
2929
import org.apache.paimon.types.RowType;
3030

31+
import org.apache.flink.api.common.JobStatus;
3132
import org.apache.flink.configuration.Configuration;
3233
import org.apache.flink.core.execution.JobClient;
3334

@@ -233,28 +234,22 @@ protected void runSingleTableSchemaEvolutionWithSchemaIncludeRecord(
233234
"+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]");
234235
waitForResult(expected, table, rowType, primaryKeys);
235236

236-
// column type covert (int64 -> string)
237+
// column type covert exception (int64 -> string)
237238
writeRecordsToKafka(
238239
topic, "kafka/%s/table/schema/%s/%s-data-5.txt", format, sourceDir, format);
239240

240-
rowType =
241-
RowType.of(
242-
new DataType[] {
243-
DataTypes.INT().notNull(),
244-
DataTypes.STRING(),
245-
DataTypes.STRING(),
246-
DataTypes.DOUBLE(),
247-
DataTypes.STRING()
248-
},
249-
new String[] {"id", "name", "description", "weight", "age"});
250-
expected =
251-
Arrays.asList(
252-
"+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
253-
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]",
254-
"+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]",
255-
"+I[105, hammer, 14oz carpenter's hammer, 0.875, 24]",
256-
"+I[106, hammer, 12oz carpenter's hammer, 0.75, 24]");
257-
waitForResult(expected, table, rowType, primaryKeys);
241+
while (true) {
242+
JobStatus status = jobClient.getJobStatus().get();
243+
if (status != JobStatus.RUNNING) {
244+
assertThatThrownBy(() -> jobClient.getJobExecutionResult().get())
245+
.satisfies(
246+
anyCauseMatches(
247+
UnsupportedOperationException.class,
248+
"Cannot convert field age from type BIGINT to STRING of Paimon table"));
249+
break;
250+
}
251+
Thread.sleep(1000);
252+
}
258253
}
259254

260255
public void testNotSupportFormat(String format) throws Exception {

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.flink.action.cdc.mysql;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.flink.action.cdc.TypeMapping;
2223
import org.apache.paimon.options.CatalogOptions;
2324
import org.apache.paimon.schema.SchemaChange;
2425
import org.apache.paimon.schema.SchemaManager;
@@ -258,7 +259,12 @@ public void testMultipleSchemaEvolutions() throws Exception {
258259
mySqlConfig.put("database-name", DATABASE_NAME);
259260
mySqlConfig.put("table-name", "schema_evolution_multiple");
260261

261-
MySqlSyncTableAction action = syncTableActionBuilder(mySqlConfig).build();
262+
MySqlSyncTableAction action =
263+
syncTableActionBuilder(mySqlConfig)
264+
.withTypeMappingModes(
265+
TypeMapping.TypeMappingMode.ALLOW_NON_STRING_TO_STRING
266+
.configString())
267+
.build();
262268
runActionWithDefaultEnv(action);
263269

264270
checkTableSchema(

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.paimon.flink.sink.cdc;
2020

21+
import org.apache.paimon.flink.action.cdc.TypeMapping;
2122
import org.apache.paimon.types.BigIntType;
2223
import org.apache.paimon.types.DecimalType;
2324
import org.apache.paimon.types.IntType;
@@ -39,11 +40,13 @@ public void testCanConvertString() {
3940

4041
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = null;
4142
convertAction =
42-
UpdatedDataFieldsProcessFunctionBase.canConvert(oldVarchar, biggerLengthVarchar);
43+
UpdatedDataFieldsProcessFunctionBase.canConvert(
44+
oldVarchar, biggerLengthVarchar, TypeMapping.defaultMapping());
4345
Assert.assertEquals(
4446
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, convertAction);
4547
convertAction =
46-
UpdatedDataFieldsProcessFunctionBase.canConvert(oldVarchar, smallerLengthVarchar);
48+
UpdatedDataFieldsProcessFunctionBase.canConvert(
49+
oldVarchar, smallerLengthVarchar, TypeMapping.defaultMapping());
4750

4851
Assert.assertEquals(
4952
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, convertAction);
@@ -56,10 +59,14 @@ public void testCanConvertNumber() {
5659
SmallIntType smallintType = new SmallIntType();
5760

5861
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = null;
59-
convertAction = UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, bigintType);
62+
convertAction =
63+
UpdatedDataFieldsProcessFunctionBase.canConvert(
64+
oldType, bigintType, TypeMapping.defaultMapping());
6065
Assert.assertEquals(
6166
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, convertAction);
62-
convertAction = UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallintType);
67+
convertAction =
68+
UpdatedDataFieldsProcessFunctionBase.canConvert(
69+
oldType, smallintType, TypeMapping.defaultMapping());
6370

6471
Assert.assertEquals(
6572
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, convertAction);
@@ -72,10 +79,14 @@ public void testCanConvertDecimal() {
7279
DecimalType smallerRangeType = new DecimalType(10, 3);
7380

7481
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = null;
75-
convertAction = UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, biggerRangeType);
82+
convertAction =
83+
UpdatedDataFieldsProcessFunctionBase.canConvert(
84+
oldType, biggerRangeType, TypeMapping.defaultMapping());
7685
Assert.assertEquals(
7786
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, convertAction);
78-
convertAction = UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallerRangeType);
87+
convertAction =
88+
UpdatedDataFieldsProcessFunctionBase.canConvert(
89+
oldType, smallerRangeType, TypeMapping.defaultMapping());
7990

8091
Assert.assertEquals(
8192
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, convertAction);
@@ -89,11 +100,13 @@ public void testCanConvertTimestamp() {
89100

90101
UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction = null;
91102
convertAction =
92-
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, biggerLengthTimestamp);
103+
UpdatedDataFieldsProcessFunctionBase.canConvert(
104+
oldType, biggerLengthTimestamp, TypeMapping.defaultMapping());
93105
Assert.assertEquals(
94106
UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, convertAction);
95107
convertAction =
96-
UpdatedDataFieldsProcessFunctionBase.canConvert(oldType, smallerLengthTimestamp);
108+
UpdatedDataFieldsProcessFunctionBase.canConvert(
109+
oldType, smallerLengthTimestamp, TypeMapping.defaultMapping());
97110

98111
Assert.assertEquals(
99112
UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, convertAction);

paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/schemaevolution/debezium-data-5.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@
1616
* limitations under the License.
1717
*/
1818

19-
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"},{"type":"string","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"}, "payload":{"before": null, "after": {"id": 106, "name": "hammer", "description": "12oz carpenter's hammer", "weight": 0.75, "age": "24"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
19+
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"},{"type":"string","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"}, "payload":{"before": null, "after": {"id": 105, "name": "hammer", "description": "12oz carpenter's hammer", "weight": 0.75, "age": "24"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}

0 commit comments

Comments
 (0)