Skip to content

Commit 926c381

Browse files
authored
[cdc] Support computed column referring to each other while sync_table (#5972)
1 parent c0f1ffb commit 926c381

File tree

12 files changed

+366
-290
lines changed

12 files changed

+366
-290
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,6 @@ public String fieldReference() {
5353
return expression.fieldReference();
5454
}
5555

56-
@Nullable
57-
public DataType fieldReferenceType() {
58-
return expression.fieldReferenceType();
59-
}
60-
6156
/** Compute column's value from given argument. Return null if input is null. */
6257
@Nullable
6358
public String eval(@Nullable String input) {
@@ -66,13 +61,4 @@ public String eval(@Nullable String input) {
6661
}
6762
return expression.eval(input);
6863
}
69-
70-
/** Compute column's value from given argument. Return null if input is null. */
71-
@Nullable
72-
public String eval(@Nullable String input, DataType inputType) {
73-
if (fieldReference() != null && input == null) {
74-
return null;
75-
}
76-
return expression.eval(input, inputType);
77-
}
7864
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java

Lines changed: 45 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,19 @@
1818

1919
package org.apache.paimon.flink.action.cdc;
2020

21-
import org.apache.paimon.annotation.VisibleForTesting;
21+
import org.apache.paimon.flink.action.cdc.utils.DfsSort;
2222
import org.apache.paimon.types.DataField;
2323
import org.apache.paimon.types.DataType;
2424
import org.apache.paimon.utils.Preconditions;
2525

26+
import org.apache.flink.api.java.tuple.Tuple2;
27+
2628
import java.util.ArrayList;
27-
import java.util.Collections;
28-
import java.util.HashMap;
29-
import java.util.HashSet;
29+
import java.util.LinkedHashMap;
3030
import java.util.List;
3131
import java.util.Map;
32-
import java.util.Set;
3332
import java.util.stream.Collectors;
3433

35-
import static org.apache.paimon.utils.Preconditions.checkArgument;
36-
3734
/** Utility methods for {@link ComputedColumn}, such as build. */
3835
public class ComputedColumnUtils {
3936

@@ -50,16 +47,44 @@ public static List<ComputedColumn> buildComputedColumns(
5047
.collect(
5148
Collectors.toMap(DataField::name, DataField::type, (v1, v2) -> v2));
5249

50+
// sort computed column args by dependencies
51+
LinkedHashMap<String, Tuple2<String, String[]>> sortedArgs =
52+
sortComputedColumnArgs(computedColumnArgs, caseSensitive);
53+
5354
List<ComputedColumn> computedColumns = new ArrayList<>();
54-
for (String columnArg : computedColumnArgs) {
55-
String[] kv = columnArg.split("=");
55+
for (Map.Entry<String, Tuple2<String, String[]>> columnArg : sortedArgs.entrySet()) {
56+
String columnName = columnArg.getKey().trim();
57+
String exprName = columnArg.getValue().f0.trim();
58+
String[] args = columnArg.getValue().f1;
59+
60+
Expression expr = Expression.create(typeMapping, caseSensitive, exprName, args);
61+
ComputedColumn cmpColumn = new ComputedColumn(columnName, expr);
62+
computedColumns.add(new ComputedColumn(columnName, expr));
63+
64+
// remember the column type for later reference by other computed columns
65+
typeMapping.put(columnName, cmpColumn.columnType());
66+
}
67+
68+
return computedColumns;
69+
}
70+
71+
private static LinkedHashMap<String, Tuple2<String, String[]>> sortComputedColumnArgs(
72+
List<String> computedColumnArgs, boolean caseSensitive) {
73+
List<String> argList =
74+
computedColumnArgs.stream()
75+
.map(x -> caseSensitive ? x : x.toUpperCase())
76+
.collect(Collectors.toList());
77+
78+
LinkedHashMap<String, Tuple2<String, String[]>> eqMap = new LinkedHashMap<>();
79+
LinkedHashMap<String, String> refMap = new LinkedHashMap<>();
80+
for (String arg : argList) {
81+
String[] kv = arg.split("=");
5682
if (kv.length != 2) {
5783
throw new IllegalArgumentException(
5884
String.format(
5985
"Invalid computed column argument: %s. Please use format 'column-name=expr-name(args, ...)'.",
60-
columnArg));
86+
arg));
6187
}
62-
String columnName = kv[0].trim();
6388
String expression = kv[1].trim();
6489
// parse expression
6590
int left = expression.indexOf('(');
@@ -69,101 +94,21 @@ public static List<ComputedColumn> buildComputedColumns(
6994
String.format(
7095
"Invalid expression: %s. Please use format 'expr-name(args, ...)'.",
7196
expression));
72-
7397
String exprName = expression.substring(0, left);
7498
String[] args = expression.substring(left + 1, right).split(",");
75-
checkArgument(args.length >= 1, "Computed column needs at least one argument.");
76-
77-
computedColumns.add(
78-
new ComputedColumn(
79-
columnName,
80-
Expression.create(typeMapping, caseSensitive, exprName, args)));
81-
}
82-
83-
return sortComputedColumns(computedColumns);
84-
}
85-
86-
@VisibleForTesting
87-
public static List<ComputedColumn> sortComputedColumns(List<ComputedColumn> columns) {
88-
Set<String> columnNames = new HashSet<>();
89-
for (ComputedColumn col : columns) {
90-
columnNames.add(col.columnName());
91-
}
9299

93-
// For simple processing, no reference or referring to another computed column, means
94-
// independent
95-
List<ComputedColumn> independent = new ArrayList<>();
96-
List<ComputedColumn> dependent = new ArrayList<>();
97-
98-
for (ComputedColumn col : columns) {
99-
if (col.fieldReference() == null || !columnNames.contains(col.fieldReference())) {
100-
independent.add(col);
101-
} else {
102-
dependent.add(col);
103-
}
104-
}
105-
106-
// Sort dependent columns with topological sort
107-
Map<String, ComputedColumn> columnMap = new HashMap<>();
108-
Map<String, Set<String>> reverseDependencies = new HashMap<>();
109-
110-
for (ComputedColumn col : dependent) {
111-
columnMap.put(col.columnName(), col);
112-
reverseDependencies
113-
.computeIfAbsent(col.fieldReference(), k -> new HashSet<>())
114-
.add(col.columnName());
115-
}
116-
117-
List<ComputedColumn> sortedDependent = new ArrayList<>();
118-
Set<String> visited = new HashSet<>();
119-
Set<String> tempMark = new HashSet<>(); // For cycle detection
120-
121-
for (ComputedColumn col : dependent) {
122-
if (!visited.contains(col.columnName())) {
123-
dfs(
124-
col.columnName(),
125-
reverseDependencies,
126-
columnMap,
127-
sortedDependent,
128-
visited,
129-
tempMark);
130-
}
100+
// args[0] may be empty string, eg. "cal_col=now()"
101+
eqMap.put(kv[0].trim(), Tuple2.of(exprName, args));
102+
refMap.put(kv[0].trim(), args[0].trim());
131103
}
132104

133-
Collections.reverse(sortedDependent);
105+
List<String> sortedKeys = DfsSort.sortKeys(refMap);
134106

135-
// Independent should precede dependent
136-
List<ComputedColumn> result = new ArrayList<>();
137-
result.addAll(independent);
138-
result.addAll(sortedDependent);
139-
140-
return result;
141-
}
142-
143-
private static void dfs(
144-
String node,
145-
Map<String, Set<String>> reverseDependencies,
146-
Map<String, ComputedColumn> columnMap,
147-
List<ComputedColumn> sorted,
148-
Set<String> visited,
149-
Set<String> tempMark) {
150-
if (tempMark.contains(node)) {
151-
throw new IllegalArgumentException("Cycle detected: " + node);
152-
}
153-
if (visited.contains(node)) {
154-
return;
107+
LinkedHashMap<String, Tuple2<String, String[]>> sortedMap =
108+
new LinkedHashMap<>(refMap.size());
109+
for (String key : sortedKeys) {
110+
sortedMap.put(key, eqMap.get(key));
155111
}
156-
157-
tempMark.add(node);
158-
ComputedColumn current = columnMap.get(node);
159-
160-
// Process the dependencies
161-
for (String dependent : reverseDependencies.getOrDefault(node, Collections.emptySet())) {
162-
dfs(dependent, reverseDependencies, columnMap, sorted, visited, tempMark);
163-
}
164-
165-
tempMark.remove(node);
166-
visited.add(node);
167-
sorted.add(current);
112+
return sortedMap;
168113
}
169114
}

0 commit comments

Comments
 (0)