Skip to content

Commit f10a3f4

Browse files
committed
[flink] Bump Flink version to 2.1
1 parent 926c381 commit f10a3f4

File tree

13 files changed

+532
-10
lines changed

13 files changed

+532
-10
lines changed

.github/workflows/e2e-tests-flink-2.x-jdk11.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ jobs:
3838
fail-fast: true
3939
matrix:
4040
# Last element should be the current default flink version
41-
flink_version: [ '2.0' ]
41+
flink_version: [ '2.0', '2.1' ]
4242
steps:
4343
- name: Checkout code
4444
uses: actions/checkout@v4

.github/workflows/utitcase-flink-2.x-jdk11.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jobs:
5555
jvm_timezone=$(random_timezone)
5656
echo "JVM timezone is set to $jvm_timezone"
5757
test_modules=""
58-
for suffix in 2.0 common; do
58+
for suffix in 2.0 2.1 common; do
5959
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
6060
done
6161
test_modules="${test_modules%,}"
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.types.variant;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
23+
import java.io.Serializable;
24+
25+
/** Variant represent a semi-structured data. */
26+
@PublicEvolving
27+
public interface Variant extends Serializable {}

paimon-flink/paimon-flink-2.1/pom.xml

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing,
14+
software distributed under the License is distributed on an
15+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
KIND, either express or implied. See the License for the
17+
specific language governing permissions and limitations
18+
under the License.
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
25+
<parent>
26+
<groupId>org.apache.paimon</groupId>
27+
<artifactId>paimon-flink</artifactId>
28+
<version>1.3-SNAPSHOT</version>
29+
</parent>
30+
31+
<packaging>jar</packaging>
32+
33+
<artifactId>paimon-flink-2.1</artifactId>
34+
<name>Paimon : Flink : 2.1</name>
35+
36+
<properties>
37+
<flink.version>2.1.0</flink.version>
38+
</properties>
39+
40+
<dependencies>
41+
<dependency>
42+
<groupId>org.apache.paimon</groupId>
43+
<artifactId>paimon-flink-common</artifactId>
44+
<version>${project.version}</version>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>org.apache.paimon</groupId>
49+
<artifactId>paimon-flink2-common</artifactId>
50+
<version>${project.version}</version>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>org.apache.flink</groupId>
55+
<artifactId>flink-streaming-java</artifactId>
56+
<version>${flink.version}</version>
57+
<scope>provided</scope>
58+
</dependency>
59+
60+
<dependency>
61+
<groupId>org.apache.flink</groupId>
62+
<artifactId>flink-table-common</artifactId>
63+
<version>${flink.version}</version>
64+
<scope>provided</scope>
65+
</dependency>
66+
</dependencies>
67+
68+
<build>
69+
<plugins>
70+
<plugin>
71+
<groupId>org.apache.maven.plugins</groupId>
72+
<artifactId>maven-shade-plugin</artifactId>
73+
<executions>
74+
<execution>
75+
<id>shade-paimon</id>
76+
<phase>package</phase>
77+
<goals>
78+
<goal>shade</goal>
79+
</goals>
80+
<configuration>
81+
<artifactSet>
82+
<includes combine.children="append">
83+
<include>org.apache.paimon:paimon-flink-common</include>
84+
<include>org.apache.paimon:paimon-flink2-common</include>
85+
</includes>
86+
</artifactSet>
87+
</configuration>
88+
</execution>
89+
</executions>
90+
</plugin>
91+
</plugins>
92+
</build>
93+
</project>

paimon-flink/paimon-flink-common/pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ under the License.
3838
</properties>
3939

4040
<dependencies>
41+
<dependency>
42+
<groupId>org.apache.paimon</groupId>
43+
<artifactId>${paimon-flinkx-common}</artifactId>
44+
<version>${project.version}</version>
45+
</dependency>
46+
4147
<dependency>
4248
<groupId>org.apache.flink</groupId>
4349
<artifactId>flink-core</artifactId>
@@ -81,12 +87,6 @@ under the License.
8187
<scope>test</scope>
8288
</dependency>
8389

84-
<dependency>
85-
<groupId>org.apache.paimon</groupId>
86-
<artifactId>${paimon-flinkx-common}</artifactId>
87-
<version>${project.version}</version>
88-
</dependency>
89-
9090
<dependency>
9191
<groupId>org.apache.paimon</groupId>
9292
<artifactId>${paimon-flinkx-common}</artifactId>

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.apache.flink.table.data.StringData;
3434
import org.apache.flink.table.data.TimestampData;
3535
import org.apache.flink.types.RowKind;
36+
import org.apache.flink.types.variant.BinaryVariant;
37+
import org.apache.flink.types.variant.Variant;
3638

3739
import static org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
3840

@@ -145,6 +147,11 @@ public RowData getRow(int pos, int numFields) {
145147
return new FlinkRowData(row.getRow(pos, numFields));
146148
}
147149

150+
public Variant getVariant(int pos) {
151+
org.apache.paimon.data.variant.Variant variant = row.getVariant(pos);
152+
return new BinaryVariant(variant.value(), variant.metadata());
153+
}
154+
148155
private static class FlinkArrayData implements ArrayData {
149156

150157
private final InternalArray array;
@@ -218,6 +225,11 @@ public <T> RawValueData<T> getRawValue(int pos) {
218225
throw new UnsupportedOperationException();
219226
}
220227

228+
public Variant getVariant(int pos) {
229+
org.apache.paimon.data.variant.Variant variant = array.getVariant(pos);
230+
return new BinaryVariant(variant.value(), variant.metadata());
231+
}
232+
221233
@Override
222234
public byte[] getBinary(int pos) {
223235
return array.getBinary(pos);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/NestedProjectedRowData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.table.data.TimestampData;
2929
import org.apache.flink.table.types.logical.RowType;
3030
import org.apache.flink.types.RowKind;
31+
import org.apache.flink.types.variant.Variant;
3132

3233
import javax.annotation.Nullable;
3334

@@ -195,6 +196,10 @@ public RowData getRow(int pos, int numFields) {
195196
return getFieldAs(pos, (rowData, internalPos) -> rowData.getRow(internalPos, numFields));
196197
}
197198

199+
public Variant getVariant(int pos) {
200+
return getFieldAs(pos, RowData::getVariant);
201+
}
202+
198203
private @Nullable RowData extractInternalRow(int pos) {
199204
int[] projectedField = projectedFields[pos];
200205
RowData rowData = this.row;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/ProjectedRowData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.table.data.TimestampData;
2828
import org.apache.flink.table.types.DataType;
2929
import org.apache.flink.types.RowKind;
30+
import org.apache.flink.types.variant.Variant;
3031

3132
import java.util.Arrays;
3233

@@ -156,6 +157,10 @@ public RowData getRow(int pos, int numFields) {
156157
return row.getRow(indexMapping[pos], numFields);
157158
}
158159

160+
public Variant getVariant(int pos) {
161+
return row.getVariant(indexMapping[pos]);
162+
}
163+
159164
@Override
160165
public boolean equals(Object o) {
161166
throw new UnsupportedOperationException("Projected row data cannot be compared");

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SerializableRowData.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.table.data.StringData;
3030
import org.apache.flink.table.data.TimestampData;
3131
import org.apache.flink.types.RowKind;
32+
import org.apache.flink.types.variant.Variant;
3233

3334
import java.io.IOException;
3435
import java.io.ObjectInputStream;
@@ -153,4 +154,9 @@ public MapData getMap(int i) {
153154
public RowData getRow(int i, int rowArity) {
154155
return row.getRow(i, rowArity);
155156
}
157+
158+
@Override
159+
public Variant getVariant(int i) {
160+
return row.getVariant(i);
161+
}
156162
}

0 commit comments

Comments
 (0)