From d73c622055e5fb9c2f8530381b58d2a074959c33 Mon Sep 17 00:00:00 2001 From: mrtisttt Date: Wed, 11 Jun 2025 03:09:38 +0800 Subject: [PATCH 1/4] [Feature][Connector-V2] ClickHouse source support parallelism --- docs/en/connector-v2/source/Clickhouse.md | 119 +++++- docs/zh/connector-v2/source/Clickhouse.md | 120 +++++- .../config/ClickhouseSourceConfig.java | 52 +++ .../config/ClickhouseSourceOptions.java | 28 +- .../source/ClickHouseSourceSplit.java | 39 ++ .../source/ClickhouseChunkSplitter.java | 350 ++++++++++++++++++ ...houseNumericBetweenParametersProvider.java | 107 ++++++ .../clickhouse/source/ClickhouseSource.java | 45 ++- .../source/ClickhouseSourceFactory.java | 5 +- .../source/ClickhouseSourceReader.java | 86 +++-- .../ClickhouseSourceSplitEnumerator.java | 138 +++++++ .../source/ClickhouseChunkSplitterTest.java | 305 +++++++++++++++ .../seatunnel/clickhouse/ClickhouseIT.java | 72 +++- .../test/resources/init/clickhouse_init.conf | 25 ++ ...to_clickhouse_with_parallel_read_date.conf | 49 +++ ...lickhouse_with_parallel_read_datetime.conf | 51 +++ ..._clickhouse_with_parallel_read_number.conf | 51 +++ ..._clickhouse_with_parallel_read_single.conf | 47 +++ ..._clickhouse_with_parallel_read_string.conf | 49 +++ 19 files changed, 1682 insertions(+), 56 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseNumericBetweenParametersProvider.java create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_datetime.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_number.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_single.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_string.conf diff --git a/docs/en/connector-v2/source/Clickhouse.md b/docs/en/connector-v2/source/Clickhouse.md index 3581d8bf46d..93ebccc1e3a 100644 --- a/docs/en/connector-v2/source/Clickhouse.md +++ b/docs/en/connector-v2/source/Clickhouse.md @@ -16,8 +16,8 @@ import ChangeLog from '../changelog/connector-clickhouse.md'; - [ ] [stream](../../concept/connector-v2-features.md) - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [column projection](../../concept/connector-v2-features.md) -- [ ] [parallelism](../../concept/connector-v2-features.md) -- [ ] [support user-defined split](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) > supports query SQL and can achieve projection effect. @@ -60,6 +60,10 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | password | String | Yes | - | `ClickHouse` user password. | | clickhouse.config | Map | No | - | In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc`. | | server_time_zone | String | No | ZoneId.systemDefault() | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. | +| partition_column | String | No | | When performing parallel reads on a source table, the split field currently supports numeric, date, time, and string types. If left unspecified, data reads will default to using only one split, meaning parallel reads will not be enabled. In this case, all configurations related to parallel reads will not take effect. | +| partition_num | Integer | No | 10 | The number of splits when performing parallel reads on a source table. | +| partition_lower_bound | String | No | | The lower bound value for splitting during parallel reads. Depending on the splitting field's data type, enter the corresponding value. The splitting algorithm uses this as the lower limit of the splitting range. If `partition_upper_bound` is not specified, this parameter will be ignored. | +| partition_upper_bound | String | No | | The upper bound value for splitting during parallel reads. Depending on the sharding field's data type, enter the corresponding value. The splitting algorithm uses this as the upper limit of the splitting range. If `partition_lower_bound` is not specified, this parameter will be ignored. | | common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. | ## How to Create a Clickhouse Data Synchronization Jobs @@ -96,10 +100,117 @@ sink { } ``` -### Tips - +> Tips +> > 1.[SeaTunnel Deployment Document](../../start-v2/locally/deployment.md). +## Analysis of Key Features + +### Parallel Reading + +#### Splitting Algorithm + +Parallel reading shard splitting strategy, mainly divided into two categories according to the type of partition field: + +**1.Numeric types** + +Numeric types include pure numeric types and date types: + +(1) Pure numeric types + +Calculate the partition size based on the lower and upper bounds, and split according to the number of partitions (the last partition may be smaller than the partition size). + +(2) Time types + +Time types mainly include two categories: Date and DateTime. Regardless of the category, they will first be converted to their numerical values, and then the splitting algorithm is the same as that for pure numeric types. After splitting into partitions, if the field is of type Date, ClickHouse's toDate() function will be used to convert the partition values. If it is of type DateTime, the toDateTime64() function will be used instead. + +> Regardless of whether it is a pure numeric type or a time type, if the lower or upper bound is not specified, the database will be requested to obtain the maximum and minimum values. + +**2.String types** +For strings, specifying upper and lower bounds is invalid. The splitting algorithm will take the modulus of the partition field according to the number of partitions to split the data. + + + +After splitting the data using the split algorithm described above, the corresponding splits are evenly distributed to Readers with the specified parallelism. This enables parallel reads from the ClickHouse data table, significantly enhancing the efficiency of data retrieval from ClickHouse. + +#### Configuration Examples + +**1.Pure numeric types** + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "id" + partition_num = 3 + # partition_lower_bound = 1 + # partition_upper_bound = 10 + } +} +``` + +**2.Time types** + +Date type: + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "enrollment_date" + partition_num = 3 + # partition_lower_bound = "2024-05-20" + # partition_upper_bound = "2024-06-20" + } +} +``` + +DateTime type: + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "date" + partition_num = 3 + # partition_lower_bound = "2024-05-20 08:30:00" + # partition_upper_bound = "2024-06-19 13:30:00" + } +} +``` + +**3.String types** + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "email" + partition_num = 3 + } +} +``` + ## Changelog diff --git a/docs/zh/connector-v2/source/Clickhouse.md b/docs/zh/connector-v2/source/Clickhouse.md index f261a861c7a..544e6e57113 100644 --- a/docs/zh/connector-v2/source/Clickhouse.md +++ b/docs/zh/connector-v2/source/Clickhouse.md @@ -16,8 +16,8 @@ import ChangeLog from '../changelog/connector-clickhouse.md'; - [ ] [流处理](../../concept/connector-v2-features.md) - [ ] [精确一次](../../concept/connector-v2-features.md) - [x] [列映射](../../concept/connector-v2-features.md) -- [ ] [并行度](../../concept/connector-v2-features.md) -- [ ] [支持用户自定义拆分](../../concept/connector-v2-features.md) +- [x] [并行度](../../concept/connector-v2-features.md) +- [x] [支持用户自定义拆分](../../concept/connector-v2-features.md) > 支持查询SQL,可以实现投影效果。 @@ -59,6 +59,10 @@ import ChangeLog from '../changelog/connector-clickhouse.md'; | password | String | 是 | - | `ClickHouse` user 用户密码. | | clickhouse.config | Map | 否 | - | 除了上述必须由 `clickhouse-jdbc` 指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了 `clickhouse-jdbc` 提供的所有[参数](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration). | | server_time_zone | String | 否 | ZoneId.systemDefault() | 数据库服务中的会话时区。如果未设置,则使用ZoneId.systemDefault()设置服务时区. | +| partition_column | String | 否 | | 并行读取数据表时的分片字段,目前支持数字、日期、时间和字符串类型,如果不填写,则数据读取默认只有1个分片,即不进行并行读取,此时跟并行读取的相关配置都不会生效 | +| partition_num | Integer | 否 | 10 | 并行读取数据表时的分片数量 | +| partition_lower_bound | String | 否 | | 并行读取进行分片时的下限值,根据分片字段填入对应数据类型,分片算法会以此作为分片的下限范围,如果partition_upper_bound没有填写,则不生效 | +| partition_upper_bound | String | 否 | | 并行读取进行分片时的上限值,根据分片字段填入对应数据类型,分片算法会以此作为分片的上限范围,如果partition_lower_bound没有填写,则不生效 | | common-options | | 否 | - | 源插件常用参数,详见 [源通用选项](../source-common-options.md). | ## 如何创建Clickhouse数据同步作业 @@ -96,10 +100,118 @@ sink { } ``` -### 小提示 - +> 小提示 +> > 1.[SeaTunnel 部署文档](../../start-v2/locally/deployment.md). +## 关键特性解析 + +### 并行读取 + +#### 分片算法 + +并行读取分片切分策略,根据分区字段的类型不同,主要分为两大类: + +**1.数字类** + +数字类又包含纯数字类和日期类: + +(1)纯数字类 + +基于下限和上限,计算出分区大小,并根据分区数进行切分(最后一个分区可能会小于分区大小)。 + +(2)时间类 + +时间类在主要是包括Date和DateTime两大类,但不管哪一类,都会先转换为其数值大小,然后切分算法跟纯数字类一样,切分出分区之后,如果字段是Date类型,则会使用ClickHouse的`toDate()`函数将分区数值进行转换,而如果是DateTime类型,则会使用ClickHouse的`toDateTime64()`函数将分区数值进行转换 + +> 无论是纯数值类型还是时间类型,如果未指定下限或上限,将会请求数据库获取最大值和最小值。 + +**2.字符串类** + +对于字符串,指定上下限无效,切分算法会对分区字段根据分区数进行取模,以切分数据。 + + + +在根据以上分片算法对数据进行分片之后,相应分片会平均发送给对应并行度的Reader,进而实现并行读取ClickHouse数据表,从而极大提高ClickHouse数据读取效率。 + +#### 配置案例 + +**1.纯数字类** + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "id" + partition_num = 3 + # partition_lower_bound = 1 + # partition_upper_bound = 10 + } +} +``` + +**2.时间类** + +Date类: + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "enrollment_date" + partition_num = 3 + # partition_lower_bound = "2024-05-20" + # partition_upper_bound = "2024-06-20" + } +} +``` + +DateTime类: + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "date" + partition_num = 3 + # partition_lower_bound = "2024-05-20 08:30:00" + # partition_upper_bound = "2024-06-19 13:30:00" + } +} +``` + +**3.字符串类** + +``` +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "email" + partition_num = 3 + } +} +``` + ## 变更日志 \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java new file mode 100644 index 00000000000..0ef7e16d4c8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; + +import com.clickhouse.client.ClickHouseNode; +import lombok.Builder; +import lombok.Data; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import java.io.Serializable; +import java.util.List; + +@Data +@Builder(builderClassName = "Builder") +public class ClickhouseSourceConfig implements Serializable { + + private String serverTimeZone; + private List nodes; + private String sql; + private String partitionColumn; + private String partitionUpperBound; + private String partitionLowerBound; + private Integer partitionNum; + + public static ClickhouseSourceConfig of(ReadonlyConfig config) { + Builder builder = ClickhouseSourceConfig.builder(); + + builder.serverTimeZone(config.get(ClickhouseBaseOptions.SERVER_TIME_ZONE)); + builder.sql(config.get(ClickhouseSourceOptions.SQL)); + builder.partitionColumn(config.get(ClickhouseSourceOptions.PARTITION_COLUMN)); + builder.partitionUpperBound(config.get(ClickhouseSourceOptions.PARTITION_UPPER_BOUND)); + builder.partitionLowerBound(config.get(ClickhouseSourceOptions.PARTITION_LOWER_BOUND)); + builder.partitionNum(config.get(ClickhouseSourceOptions.PARTITION_NUM)); + + return builder.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java index 6354cf22a39..790216f9658 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceOptions.java @@ -20,11 +20,35 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -public class ClickhouseSourceOptions { +public interface ClickhouseSourceOptions { - public static final Option SQL = + Option SQL = Options.key("sql") .stringType() .noDefaultValue() .withDescription("Clickhouse sql used to query data"); + + Option PARTITION_COLUMN = + Options.key("partition_column") + .stringType() + .noDefaultValue() + .withDescription("partition column"); + + Option PARTITION_NUM = + Options.key("partition_num") + .intType() + .defaultValue(10) + .withDescription("partition num"); + + Option PARTITION_LOWER_BOUND = + Options.key("partition_lower_bound") + .stringType() + .noDefaultValue() + .withDescription("partition lower bound"); + + Option PARTITION_UPPER_BOUND = + Options.key("partition_upper_bound") + .stringType() + .noDefaultValue() + .withDescription("partition upper bound"); } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java new file mode 100644 index 00000000000..ed05c5d5520 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.ToString; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; + +@Data +@ToString +@AllArgsConstructor +public class ClickHouseSourceSplit implements SourceSplit { + private static final long serialVersionUID = -815542654355310611L; + private final TablePath tablePath; + private final String splitId; + private final String splitQuery; + + @Override + public String splitId() { + return splitId; + } +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java new file mode 100644 index 00000000000..cd99f4fec9b --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; + +import com.clickhouse.client.*; +import com.clickhouse.client.data.ClickHouseDateTimeValue; +import com.clickhouse.client.data.ClickHouseDateValue; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; + + +/** + * Parallel reading shard splitting strategy, mainly divided into two categories according to the type of partition field: + * + *

1. Numeric types + * + *

Numeric types include pure numeric types and date types: + * + *

(1) Pure numeric types + *

Calculate the partition size based on the lower and upper bounds, and split according to the number of partitions + * (the last partition may be smaller than the partition size). + * + *

(2) Time types + *

Time types mainly include two categories: Date and DateTime. Regardless of the category, they will first be + * converted to their numerical values, and then the splitting algorithm is the same as that for pure numeric types. + * After splitting into partitions, if the field is of type Date, ClickHouse's toDate() function will be used to + * convert the partition values. If it is of type DateTime, the toDateTime64() function will be used instead. + * + *

Regardless of whether it is a pure numeric type or a time type, if the lower or upper bound is not specified, + * the database will be requested to obtain the maximum and minimum values. + * + *

2. String types + *

For strings, specifying upper and lower bounds is invalid. The splitting algorithm will take the modulus of + * the partition field according to the number of partitions to split the data. + */ +@Slf4j +public class ClickhouseChunkSplitter { + + public List generateSplits( + ClickhouseSourceConfig sourceConfig, CatalogTable table) throws Exception { + log.info("Start splitting table {} into chunks...", table.getTablePath()); + long start = System.currentTimeMillis(); + + List splits; + Optional splitKeyOptional = findSplitKey(sourceConfig, table); + if (!splitKeyOptional.isPresent()) { + ClickHouseSourceSplit split = createSingleSplit(sourceConfig, table); + splits = Collections.singletonList(split); + } else { + if (splitKeyOptional.get().getTotalFields() != 1) { + throw new UnsupportedOperationException("Currently, only support one split key"); + } + splits = createSplits(sourceConfig, table, splitKeyOptional.get()); + } + + long end = System.currentTimeMillis(); + log.info( + "Split table {} into {} chunks, time cost: {}ms.", + table.getTablePath(), + splits.size(), + end - start); + return splits; + } + + private List createSplits( + ClickhouseSourceConfig sourceConfig, CatalogTable table, SeaTunnelRowType splitKey) + throws ClickHouseException { + String splitKeyName = splitKey.getFieldNames()[0]; + SeaTunnelDataType splitKeyType = splitKey.getFieldType(0); + + if (SqlType.STRING == splitKeyType.getSqlType()) { + return createStringColumnSplits(sourceConfig, table, splitKeyName); + } + return getNumberColumnSplits(sourceConfig, table, splitKeyType, splitKeyName); + } + + private List getNumberColumnSplits( + ClickhouseSourceConfig sourceConfig, + CatalogTable table, + SeaTunnelDataType splitKeyType, + String splitKeyName) + throws ClickHouseException { + Pair partitionBoundValue = + getPartitionBoundValue(sourceConfig, splitKeyType); + BigDecimal partitionStart = partitionBoundValue.getLeft(); + BigDecimal partitionEnd = partitionBoundValue.getRight(); + if (partitionStart == null || partitionEnd == null) { + Pair range = queryMinMax(sourceConfig, splitKeyName); + partitionStart = range.getLeft(); + partitionEnd = range.getRight(); + } + if (partitionStart == null || partitionEnd == null) { + ClickHouseSourceSplit split = createSingleSplit(sourceConfig, table); + return Collections.singletonList(split); + } + + return createNumberColumnSplits( + sourceConfig, table, splitKeyType, splitKeyName, partitionStart, partitionEnd); + } + + private List createNumberColumnSplits( + ClickhouseSourceConfig sourceConfig, + CatalogTable table, + SeaTunnelDataType splitKeyType, + String splitKeyName, + BigDecimal partitionStart, + BigDecimal partitionEnd) { + ClickhouseNumericBetweenParametersProvider numericBetweenParametersProvider = + new ClickhouseNumericBetweenParametersProvider(partitionStart, partitionEnd) + .ofBatchNum(sourceConfig.getPartitionNum()); + Serializable[][] parameterValues = numericBetweenParametersProvider.getParameterValues(); + List splits = new ArrayList<>(sourceConfig.getPartitionNum()); + for (int i = 0; i < parameterValues.length; i++) { + Serializable splitStart = parameterValues[i][0]; + Serializable splitEnd = parameterValues[i][1]; + Pair formattedSplitRange = + formatSplitRange(splitStart, splitEnd, splitKeyType); + + String splitQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE %s BETWEEN %s and %s", + sourceConfig.getSql(), + splitKeyName, + formattedSplitRange.getLeft(), + formattedSplitRange.getRight()); + + ClickHouseSourceSplit split = + new ClickHouseSourceSplit( + table.getTablePath(), + createSplitId(table.getTablePath(), i), + splitQuery); + splits.add(split); + } + return splits; + } + + private Pair formatSplitRange( + Serializable splitStart, Serializable splitEnd, SeaTunnelDataType splitKeyType) { + if (splitKeyType instanceof LocalTimeType) { + if (SqlType.DATE == splitKeyType.getSqlType()) { + Serializable dateSplitStart = String.format("toDate(%s)", splitStart); + Serializable dateSplitEnd = String.format("toDate(%s)", splitEnd); + return Pair.of(dateSplitStart, dateSplitEnd); + } else { + Serializable dateSplitStart = String.format("toDateTime64(%s, 3)", splitStart); + Serializable dateSplitEnd = String.format("toDateTime64(%s, 3)", splitEnd); + return Pair.of(dateSplitStart, dateSplitEnd); + } + } + return Pair.of(splitStart, splitEnd); + } + + protected Pair queryMinMax( + ClickhouseSourceConfig sourceConfig, String columnName) throws ClickHouseException { + String sqlQuery = + String.format( + "SELECT MIN(%s), MAX(%s) FROM (%s) tmp", + columnName, columnName, sourceConfig.getSql()); + log.info("Split table, query min max: {}", sqlQuery); + + List nodes = sourceConfig.getNodes(); + ClickHouseNode currentServer = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); + try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); + ClickHouseResponse response = + client.connect(currentServer) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(sqlQuery) + .executeAndWait()) { + + List records = response.stream().collect(Collectors.toList()); + if (records.isEmpty()) { + return Pair.of(null, null); + } else { + ClickHouseRecord values = records.get(0); + return Pair.of( + values.getValue(0).asBigDecimal(), values.getValue(1).asBigDecimal()); + } + } + } + + private Pair getPartitionBoundValue( + ClickhouseSourceConfig sourceConfig, SeaTunnelDataType splitKeyType) { + Function dateTimeTranslator = + value -> ClickHouseDateTimeValue.of(value, 3, + TimeZone.getTimeZone(sourceConfig.getServerTimeZone())).asBigDecimal(); + Map> timeTranslatorMap = + new HashMap>() { + { + // Clickhouse Type: Date + put(SqlType.DATE, value -> + ClickHouseDateValue.of(LocalDate.parse(value)).asBigDecimal()); + // Clickhouse Type: DateTime + put(SqlType.TIME, dateTimeTranslator); + put(SqlType.TIMESTAMP, dateTimeTranslator); + put(SqlType.TIMESTAMP_TZ, dateTimeTranslator); + } + }; + + BigDecimal partitionStart = null; + BigDecimal partitionEnd = null; + try { + if (sourceConfig.getPartitionLowerBound() != null) { + partitionStart = + timeTranslatorMap + .getOrDefault(splitKeyType.getSqlType(), BigDecimal::new) + .apply(sourceConfig.getPartitionLowerBound()); + } + if (sourceConfig.getPartitionUpperBound() != null) { + partitionEnd = + timeTranslatorMap + .getOrDefault(splitKeyType.getSqlType(), BigDecimal::new) + .apply(sourceConfig.getPartitionUpperBound()); + } + } catch (Exception e) { + throw new ClickhouseConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + "Translate partition bound value failed.", e); + } + + return Pair.of(partitionStart, partitionEnd); + } + + private List createStringColumnSplits( + ClickhouseSourceConfig sourceConfig, CatalogTable table, String splitKeyName) { + + List splits = new ArrayList<>(sourceConfig.getPartitionNum()); + for (int i = 0; i < sourceConfig.getPartitionNum(); i++) { + String splitQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE %s", + sourceConfig.getSql(), + getSplitHashClause(splitKeyName, sourceConfig.getPartitionNum(), i)); + ClickHouseSourceSplit split = + new ClickHouseSourceSplit( + table.getTablePath(), + createSplitId(table.getTablePath(), i), + splitQuery); + + splits.add(split); + } + return splits; + } + + private Optional findSplitKey( + ClickhouseSourceConfig sourceConfig, CatalogTable table) { + TableSchema schema = table.getTableSchema(); + List columns = schema.getColumns(); + Map columnMap = + columns.stream() + .collect( + Collectors.toMap( + Column::getName, column -> column, (c1, c2) -> c1)); + if (sourceConfig.getPartitionColumn() != null) { + String partitionColumn = sourceConfig.getPartitionColumn(); + Column column = columnMap.get(partitionColumn); + if (column == null) { + throw new ClickhouseConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + String.format( + "Partitioned column(%s) don't exist in the table columns", + partitionColumn)); + } + if (!isSupportSplitColumn(column)) { + throw new ClickhouseConnectorException( + CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + String.format("%s is not numeric/string type", partitionColumn)); + } + return Optional.of( + new SeaTunnelRowType( + new String[] {partitionColumn}, + new SeaTunnelDataType[] {column.getDataType()})); + } + + log.warn("No split key found for table {}", table.getTablePath()); + return Optional.empty(); + } + + private boolean isSupportSplitColumn(Column splitColumn) { + SeaTunnelDataType dataType = splitColumn.getDataType(); + // currently, we only support these types. + switch (dataType.getSqlType()) { + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case DOUBLE: + case FLOAT: + case DECIMAL: + case STRING: + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMP_TZ: + return true; + default: + return false; + } + } + + private ClickHouseSourceSplit createSingleSplit( + ClickhouseSourceConfig sourceConfig, CatalogTable table) { + return new ClickHouseSourceSplit( + table.getTablePath(), + createSplitId(table.getTablePath(), 0), + sourceConfig.getSql()); + } + + private String createSplitId(TablePath tablePath, int index) { + return String.format("%s-%s", tablePath, index); + } + + private String getSplitHashClause(String fieldName, int partitionNum, int index) { + return String.format( + "xxHash32(coalesce(`%s`, '')) %% %s = %s", fieldName, partitionNum, index); + } +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseNumericBetweenParametersProvider.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseNumericBetweenParametersProvider.java new file mode 100644 index 00000000000..b34dcab60c7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseNumericBetweenParametersProvider.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.RoundingMode; + +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkState; + +/** + * This query parameters generator is a helper class to parameterize from/to queries on a numeric + * column. The generated array of from/to values will be equally sized to fetchSize (apart from the + * last one), ranging from minVal up to maxVal. + * + *

For example, if there's a table BOOKS with a numeric PK id, using a + * query like: + * + *

+ * SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * 
+ * + *

You can take advantage of this class to automatically generate the parameters of the BETWEEN + * clause, based on the passed constructor parameters. + */ +public class ClickhouseNumericBetweenParametersProvider { + + private final BigDecimal minVal; + private final BigDecimal maxVal; + + private long batchSize; + private int batchNum; + + /** + * NumericBetweenParametersProviderJdbc constructor. + * + * @param minVal the lower bound of the produced "from" values + * @param maxVal the upper bound of the produced "to" values + */ + public ClickhouseNumericBetweenParametersProvider(BigDecimal minVal, BigDecimal maxVal) { + checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be larger than maxVal"); + this.minVal = minVal; + this.maxVal = maxVal; + } + + public ClickhouseNumericBetweenParametersProvider ofBatchNum(int batchNum) { + checkArgument(batchNum > 0, "Batch number must be positive"); + + BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1)); + if (BigDecimal.valueOf(batchNum).compareTo(maxElemCount) > 0) { + batchNum = maxElemCount.intValue(); + } + this.batchNum = batchNum; + // For the presence of a decimal we take the integer up + this.batchSize = + (maxElemCount.divide(BigDecimal.valueOf(batchNum), 2, RoundingMode.HALF_UP)) + .setScale(0, RoundingMode.CEILING) + .longValue(); + return this; + } + + public Serializable[][] getParameterValues() { + checkState( + batchSize > 0, + "Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?"); + + BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1)); + BigDecimal bigBatchNum = + maxElemCount + .subtract(BigDecimal.valueOf(batchSize - 1)) + .multiply(BigDecimal.valueOf(batchNum)); + + Serializable[][] parameters = new Serializable[batchNum][2]; + BigDecimal start = minVal; + for (int i = 0; i < batchNum; i++) { + BigDecimal end = + start.add(BigDecimal.valueOf(batchSize)) + .subtract(BigDecimal.valueOf(1)) + .subtract( + BigDecimal.valueOf(i).compareTo(bigBatchNum) >= 0 + ? BigDecimal.ONE + : BigDecimal.ZERO); + if (i == batchNum - 1 && end.compareTo(maxVal) > 0) { + end = maxVal; + } + parameters[i] = new BigDecimal[] {start, end}; + start = end.add(BigDecimal.valueOf(1)); + } + return parameters; + } +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index fcb94c47acf..de999ef216d 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -18,29 +18,39 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceReader.Context; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; -import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; -import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; import com.clickhouse.client.ClickHouseNode; import java.util.Collections; import java.util.List; -public class ClickhouseSource extends AbstractSingleSplitSource { +public class ClickhouseSource + implements SeaTunnelSource, + SupportParallelism { + private final ClickhouseSourceConfig sourceConfig; private final List servers; private final CatalogTable catalogTable; - private final String sql; private final SeaTunnelRowType rowTypeInfo; - public ClickhouseSource(List servers, CatalogTable catalogTable, String sql) { + public ClickhouseSource( + ClickhouseSourceConfig sourceConfig, + List servers, + CatalogTable catalogTable, + String sql) { + this.sourceConfig = sourceConfig; this.servers = servers; this.catalogTable = catalogTable; - this.sql = sql; this.rowTypeInfo = catalogTable.getSeaTunnelRowType(); } @@ -60,8 +70,23 @@ public List getProducedCatalogTables() { } @Override - public AbstractSingleSplitReader createReader( - SingleSplitReaderContext readerContext) { - return new ClickhouseSourceReader(servers, readerContext, sql, rowTypeInfo); + public SourceReader createReader(Context readerContext) + throws Exception { + return new ClickhouseSourceReader(servers, readerContext, rowTypeInfo); + } + + @Override + public SourceSplitEnumerator createEnumerator( + SourceSplitEnumerator.Context enumeratorContext) + throws Exception { + return new ClickhouseSourceSplitEnumerator(enumeratorContext, sourceConfig, catalogTable); + } + + @Override + public SourceSplitEnumerator restoreEnumerator( + SourceSplitEnumerator.Context enumeratorContext, + ClickhouseSourceState checkpointState) + throws Exception { + return new ClickhouseSourceSplitEnumerator(enumeratorContext, sourceConfig, catalogTable); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java index a9e217e2d55..eb275e3d6c6 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java @@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; @@ -101,9 +102,11 @@ TableSource createSource(TableSourceFactoryContext context) { Collections.emptyList(), "", catalogName); + ClickhouseSourceConfig sourceConfig = ClickhouseSourceConfig.of(readonlyConfig); + sourceConfig.setNodes(nodes); return () -> (SeaTunnelSource) - new ClickhouseSource(nodes, catalogTable, sql); + new ClickhouseSource(sourceConfig, nodes, catalogTable, sql); } catch (ClickHouseException e) { throw new ClickhouseConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index b75d71141cb..9b0c41720c9 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -18,11 +18,10 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; -import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; -import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseFormat; @@ -32,27 +31,27 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.Random; +import java.util.concurrent.ConcurrentLinkedDeque; @Slf4j -public class ClickhouseSourceReader extends AbstractSingleSplitReader { +public class ClickhouseSourceReader implements SourceReader { private final List servers; private ClickHouseClient client; private final SeaTunnelRowType rowTypeInfo; - private final SingleSplitReaderContext readerContext; + private final Context context; private ClickHouseRequest request; - private final String sql; + private final Deque splits = new ConcurrentLinkedDeque<>(); + private volatile boolean noMoreSplit; ClickhouseSourceReader( - List servers, - SingleSplitReaderContext readerContext, - String sql, - SeaTunnelRowType rowTypeInfo) { + List servers, Context context, SeaTunnelRowType rowTypeInfo) { this.servers = servers; - this.readerContext = readerContext; - this.sql = sql; + this.context = context; this.rowTypeInfo = rowTypeInfo; } @@ -74,31 +73,54 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { synchronized (output.getCheckpointLock()) { - try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { - response.stream() - .forEach( - record -> { - Object[] values = - new Object[this.rowTypeInfo.getFieldNames().length]; - for (int i = 0; i < record.size(); i++) { - if (record.getValue(i).isNullOrEmpty()) { - values[i] = null; - } else { - values[i] = - TypeConvertUtil.valueUnwrap( - this.rowTypeInfo.getFieldType(i), - record.getValue(i)); + ClickHouseSourceSplit split = splits.poll(); + if (null != split) { + String query = split.getSplitQuery(); + try (ClickHouseResponse response = this.request.query(query).executeAndWait()) { + response.stream() + .forEach( + record -> { + Object[] values = + new Object[this.rowTypeInfo.getFieldNames().length]; + for (int i = 0; i < record.size(); i++) { + if (record.getValue(i).isNullOrEmpty()) { + values[i] = null; + } else { + values[i] = + TypeConvertUtil.valueUnwrap( + this.rowTypeInfo.getFieldType(i), + record.getValue(i)); + } } - } - output.collect(new SeaTunnelRow(values)); - }); + output.collect(new SeaTunnelRow(values)); + }); + } + } else if (noMoreSplit && splits.isEmpty()) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded jdbc source"); + context.signalNoMoreElement(); + } else { + // just wait for split or NoMoreSplits signal + Thread.sleep(1000L); } - signalNoMoreElement(); } } - private void signalNoMoreElement() { - log.info("Closed the bounded ClickHouse source"); - this.readerContext.signalNoMoreElement(); + @Override + public List snapshotState(long checkpointId) throws Exception { + return new ArrayList<>(splits); } + + @Override + public void addSplits(List splits) { + this.splits.addAll(splits); + } + + @Override + public void handleNoMoreSplits() { + noMoreSplit = true; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java new file mode 100644 index 00000000000..4c104d19412 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; + +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +public class ClickhouseSourceSplitEnumerator + implements SourceSplitEnumerator { + + private final Context context; + private final ClickhouseSourceConfig sourceConfig; + private final CatalogTable catalogTable; + private final ClickhouseChunkSplitter splitter; + private final Map> pendingSplits; + private final Object stateLock = new Object(); + + public ClickhouseSourceSplitEnumerator( + Context context, + ClickhouseSourceConfig sourceConfig, + CatalogTable catalogTable) { + this.context = context; + this.sourceConfig = sourceConfig; + this.catalogTable = catalogTable; + this.splitter = new ClickhouseChunkSplitter(); + this.pendingSplits = new HashMap<>(); + } + + @Override + public void open() {} + + @Override + public void run() throws Exception { + log.info("Starting split enumerator."); + + Set readers = context.registeredReaders(); + + synchronized (stateLock) { + Collection splits = + splitter.generateSplits(sourceConfig, catalogTable); + addPendingSplit(splits); + } + + synchronized (stateLock) { + assignSplit(readers); + } + + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + private void assignSplit(Collection readers) { + log.info("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List splits = pendingSplits.remove(reader); + if (splits != null && !splits.isEmpty()) { + String splitIds = + splits.stream() + .map(ClickHouseSourceSplit::getSplitId) + .collect(Collectors.joining(", ")); + log.info("Assign splits {} to reader {}", splitIds, reader); + context.assignSplit(reader, splits); + } + } + } + + @Override + public void close() throws IOException {} + + @Override + public void addSplitsBack(List splits, int subtaskId) {} + + @Override + public int currentUnassignedSplitSize() { + return 0; + } + + @Override + public void handleSplitRequest(int subtaskId) { + throw new ClickhouseConnectorException( + CommonErrorCode.UNSUPPORTED_METHOD, + String.format("Unsupported handleSplitRequest: %d", subtaskId)); + } + + @Override + public void registerReader(int subtaskId) { + log.info("Register reader {} to ClickhouseSourceSplitEnumerator.", subtaskId); + } + + @Override + public ClickhouseSourceState snapshotState(long checkpointId) throws Exception { + synchronized (stateLock) { + return new ClickhouseSourceState(); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (ClickHouseSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private static int getSplitOwner(String splitId, int numReaders) { + return splitId.hashCode() % numReaders; + } +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java new file mode 100644 index 00000000000..3d8ccd4b852 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.seatunnel.api.table.catalog.*; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class ClickhouseChunkSplitterTest { + + private static final String QUERY_SQL = "select * from student"; + + private final ClickhouseChunkSplitter splitter = new ClickhouseChunkSplitter(); + + private final CatalogTable catalogTable = getCatalogTable(); + + @Test + public void testSplitWithoutPartitionColumn() throws Exception { + ClickhouseSourceConfig sourceConfig = getSourceConfig(null, null, null, null); + CatalogTable catalogTable = getCatalogTable(); + + Collection splits = + splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(1, splits.size()); + + ClickHouseSourceSplit split = splits.iterator().next(); + Assert.assertEquals("select * from student", split.getSplitQuery()); + } + + @Test + public void testSplitWithPartitionColumnAndWithBoundNumberColumn() throws Exception { + // 1 partition test + ClickhouseSourceConfig sourceConfig = getSourceConfig("id", "1", "30", 1); + + List splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(1, splits.size()); + String expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE id BETWEEN %s and %s", + QUERY_SQL, 1, 30); + Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + + // 3 partitions test + sourceConfig = getSourceConfig("id", "1", "30", 3); + + splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(3, splits.size()); + + List> boundValues = + ImmutableList.of(Pair.of(1, 10), Pair.of(11, 20), Pair.of(21, 30)); + for (int i = 0; i < splits.size(); i++) { + Pair boundValue = boundValues.get(i); + expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE id BETWEEN %s and %s", + QUERY_SQL, boundValue.getLeft(), boundValue.getRight()); + String splitQuery = splits.get(i).getSplitQuery(); + Assert.assertEquals(expectedQuery, splitQuery); + } + } + + @Test + public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exception { + // 1 partition test + ClickhouseSourceConfig sourceConfig = getSourceConfig( + "enrollment_date", + "2025-05-01", + "2025-06-08", + 1); + + List splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(1, splits.size()); + + LocalDate baseDate = LocalDate.of(1970, 1, 1); + LocalDate lowerBoundDate = LocalDate.of(2025, 5, 1); + LocalDate upperBoundDate = LocalDate.of(2025, 6, 8); + String expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE enrollment_date BETWEEN toDate(%s) and toDate(%s)", + QUERY_SQL, + ChronoUnit.DAYS.between(baseDate, lowerBoundDate), + ChronoUnit.DAYS.between(baseDate, upperBoundDate)); + Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + + // 3 partition test + sourceConfig = getSourceConfig( + "enrollment_date", + "2025-05-01", + "2025-06-06", + 3); + splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(3, splits.size()); + + List> boundValues = ImmutableList.of( + Pair.of(LocalDate.of(2025, 5, 1), LocalDate.of(2025, 5, 13)), + Pair.of(LocalDate.of(2025, 5, 14), LocalDate.of(2025, 5, 26)), + Pair.of(LocalDate.of(2025, 5, 27), LocalDate.of(2025, 6, 6))); + for (int i = 0; i < splits.size(); i++) { + Pair boundValue = boundValues.get(i); + expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE enrollment_date BETWEEN toDate(%s) and toDate(%s)", + QUERY_SQL, + ChronoUnit.DAYS.between(baseDate, boundValue.getLeft()), + ChronoUnit.DAYS.between(baseDate, boundValue.getRight())); + String splitQuery = splits.get(i).getSplitQuery(); + Assert.assertEquals(expectedQuery, splitQuery); + } + } + + @Test + public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exception { + // 1 partition test + ClickhouseSourceConfig sourceConfig = getSourceConfig( + "created_at", + "2025-05-01 12:30:00", + "2025-06-06 15:30:00", + 1); + + List splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(1, splits.size()); + + LocalDateTime lowerBoundDateTime = LocalDateTime.of(2025, 5, 1, 12, 30, 0); + LocalDateTime upperBoundDateTime = LocalDateTime.of(2025, 6, 6, 15, 30, 0); + String expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE created_at BETWEEN toDateTime64(%s, 3) and toDateTime64(%s, 3)", + QUERY_SQL, + lowerBoundDateTime.atZone(ZoneId.systemDefault()).toEpochSecond(), + upperBoundDateTime.atZone(ZoneId.systemDefault()).toEpochSecond()); + Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + + // 3 partition test + sourceConfig = getSourceConfig( + "created_at", + "2025-05-01 12:30:00", + "2025-06-06 15:30:00", + 3); + splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(3, splits.size()); + + List> boundValues = ImmutableList.of( + Pair.of( + LocalDateTime.of(2025, 5, 1, 12, 30, 0), + LocalDateTime.of(2025, 5, 13, 13, 30, 0)), + Pair.of( + LocalDateTime.of(2025, 5, 13, 13, 30, 1), + LocalDateTime.of(2025, 5, 25, 14, 30, 1)), + Pair.of( + LocalDateTime.of(2025, 5, 25, 14, 30, 2), + LocalDateTime.of(2025, 6, 6, 15, 30, 0))); + for (int i = 0; i < splits.size(); i++) { + Pair boundValue = boundValues.get(i); + expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE created_at BETWEEN toDateTime64(%s, 3) and toDateTime64(%s, 3)", + QUERY_SQL, + boundValue.getLeft().atZone(ZoneId.systemDefault()).toEpochSecond(), + boundValue.getRight().atZone(ZoneId.systemDefault()).toEpochSecond()); + String splitQuery = splits.get(i).getSplitQuery(); + Assert.assertEquals(expectedQuery, splitQuery); + } + } + + @Test + public void testSplitWithPartitionColumnAndWithBoundStringColumn() throws Exception { + // 1 partition test + ClickhouseSourceConfig sourceConfig = getSourceConfig( + "name", null, null, 1); + + List splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(1, splits.size()); + + String expectedQuery = String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 1 = 0", + QUERY_SQL); + Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + + // 3 partition test + sourceConfig = getSourceConfig( + "name", null, null, 3); + + splits = splitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(3, splits.size()); + + List boundValues = ImmutableList.of(0, 1, 2); + for (int i = 0; i < splits.size(); i++) { + expectedQuery = String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 3 = %s", + QUERY_SQL, + boundValues.get(i)); + String splitQuery = splits.get(i).getSplitQuery(); + Assert.assertEquals(expectedQuery, splitQuery); + } + } + + // This test make sure queryMinMax method will work as expected, + // and no need to add all column type test. + // Note that there is no equal division here, and negative numbers are also included, + // but the split algorithm is still effective. + @Test + public void testSplitWithPartitionColumnAndWithoutBound() throws Exception { + ClickhouseChunkSplitter spySplitter = Mockito.spy(splitter); + ClickhouseSourceConfig sourceConfig = getSourceConfig("id", null, null, 3); + // mock the queryMinMax method + Pair queryBoundValues = Pair.of( + new BigDecimal(-3), + new BigDecimal(31)); + Mockito.doReturn(queryBoundValues).when(spySplitter).queryMinMax(sourceConfig, "id"); + + List splits = spySplitter.generateSplits(sourceConfig, catalogTable); + Assert.assertEquals(3, splits.size()); + + List> boundValues = + ImmutableList.of(Pair.of(-3, 8), Pair.of(9, 20), Pair.of(21, 31)); + for (int i = 0; i < splits.size(); i++) { + Pair boundValue = boundValues.get(i); + String expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE id BETWEEN %s and %s", + QUERY_SQL, boundValue.getLeft(), boundValue.getRight()); + String splitQuery = splits.get(i).getSplitQuery(); + Assert.assertEquals(expectedQuery, splitQuery); + } + } + + private ClickhouseSourceConfig getSourceConfig( + String partitionColumn, + String partitionLowerBound, + String partitionUpperBound, + Integer partitionNum) { + return ClickhouseSourceConfig.builder() + .serverTimeZone(ZoneId.systemDefault().getId()) + .sql(QUERY_SQL) + .partitionColumn(partitionColumn) + .partitionLowerBound(partitionLowerBound) + .partitionUpperBound(partitionUpperBound) + .partitionNum(partitionNum) + .build(); + } + + private CatalogTable getCatalogTable() { + TableIdentifier tableId = + new TableIdentifier("clickhouse_catalog", "test", null, "student"); + TableSchema tableSchema = getTableSchema(); + + return CatalogTable.of( + tableId, + tableSchema, + Collections.emptyMap(), + Collections.emptyList(), + "", + "clickhouse_catalog"); + } + + private TableSchema getTableSchema() { + PhysicalColumn id = new PhysicalColumn("id", BasicType.LONG_TYPE, 4L, 0, false, null, null); + PhysicalColumn name = + new PhysicalColumn("name", BasicType.STRING_TYPE, 1L, 0, false, null, null); + PhysicalColumn enrollmentDate = + new PhysicalColumn( + "enrollment_date", LocalTimeType.LOCAL_DATE_TYPE, 2L, 0, false, null, null); + PhysicalColumn createdAt = + new PhysicalColumn( + "created_at", LocalTimeType.LOCAL_DATE_TIME_TYPE, 2L, 0, false, null, null); + List columns = new ArrayList<>(); + columns.add(id); + columns.add(name); + columns.add(enrollmentDate); + columns.add(createdAt); + + return TableSchema.builder().columns(columns).build(); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 98f22eeeb52..09df72e46d0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -86,6 +86,8 @@ public class ClickhouseIT extends TestSuiteBase implements TestResource { private static final String SINK_TABLE = "sink_table"; private static final List MULTI_SINK_TABLES = Arrays.asList("multi_sink_table1", "multi_sink_table2"); + private static final List PARALLEL_TABLES = + Arrays.asList("parallel_source_table", "parallel_sink_table"); private static final String INSERT_SQL = "insert_sql"; private static final String COMPARE_SQL = "compare_sql"; private static final Pair> TEST_DATASET = @@ -211,6 +213,54 @@ public void testClickHouseWithMultiTableSink(TestContainer container) throws Exc } } + @TestTemplate + public void testClickHouseWithParallelRead(TestContainer container) throws Exception { + // testClickHouseWithParallelReadNumberCol + assertParallelTableSetupStatus(); + Container.ExecResult execResult = + container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_number.conf"); + assertParallelTableSinkStatus(execResult); + clearTable("parallel_sink_table"); + + // testClickHouseWithParallelReadDateCol(Intentionally remove bound value in config file) + assertParallelTableSetupStatus(); + execResult = + container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf"); + assertParallelTableSinkStatus(execResult); + clearTable("parallel_sink_table"); + + // testClickHouseWithParallelReadDateTimeCol + assertParallelTableSetupStatus(); + execResult = + container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf"); + assertParallelTableSinkStatus(execResult); + clearTable("parallel_sink_table"); + + // testClickHouseWithParallelReadStringCol + assertParallelTableSetupStatus(); + execResult = + container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_string.conf"); + assertParallelTableSinkStatus(execResult); + clearTable("parallel_sink_table"); + + // other necessary test + assertParallelTableSetupStatus(); + execResult = + container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_single.conf"); + assertParallelTableSinkStatus(execResult); + clearTable("parallel_sink_table"); + } + + private void assertParallelTableSetupStatus() { + Assertions.assertEquals(10, countData("parallel_source_table")); + Assertions.assertEquals(0, countData("parallel_sink_table")); + } + + private void assertParallelTableSinkStatus(Container.ExecResult execResult) { + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(10, countData("parallel_sink_table")); + } + @BeforeAll @Override public void startUp() throws Exception { @@ -237,8 +287,11 @@ private void initializeClickhouseTable() { Statement statement = this.connection.createStatement(); statement.execute(CONFIG.getString(SOURCE_TABLE)); statement.execute(CONFIG.getString(SINK_TABLE)); - // table for multi-table sink test - for (String tableName : MULTI_SINK_TABLES) { + // for other usage tables + List tables = Stream + .concat(MULTI_SINK_TABLES.stream(), PARALLEL_TABLES.stream()) + .collect(Collectors.toList()); + for (String tableName : tables) { statement.execute(CONFIG.getString(tableName)); } } catch (SQLException e) { @@ -324,8 +377,21 @@ private void dropTable(String tableName) { throw new RuntimeException("Drop table failed!", e); } } - private void batchInsertData() { + this.batchInsertCommonUsageData(); + this.batchInsertParallelReadUsageData(); + } + + private void batchInsertParallelReadUsageData() { + String sql = CONFIG.getString("insert_sql_for_parallel_table"); + try(Statement statement = connection.createStatement();) { + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + private void batchInsertCommonUsageData() { String sql = CONFIG.getString(INSERT_SQL); PreparedStatement preparedStatement = null; try { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf index e5a52fec46a..c52eb5a4002 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf @@ -114,6 +114,31 @@ comment '''N''-N'; """ multi_sink_table2 = "create table if not exists `default`.multi_sink_table2 as `default`.multi_sink_table1" +parallel_source_table = """ +create table if not exists `default`.parallel_source_table( + `id` UInt32, + `email` String, + `enrollment_date` Date, + `created_at` DateTime default now() +)engine=Memory +comment '''N''-N'; +""" +parallel_sink_table = "create table if not exists `default`.parallel_sink_table as `default`.parallel_source_table" + +insert_sql_for_parallel_table = """ +INSERT INTO default.parallel_source_table (id, email, enrollment_date, created_at) VALUES +(1, 'user01@data.com', '2024-05-20', '2024-05-20 08:30:00'), +(2, 'user02@data.com', '2024-05-23', '2024-05-23 13:15:00'), +(3, 'user03@data.com', '2024-05-27', '2024-05-27 16:40:00'), +(4, 'user04@data.com', '2024-05-31', '2024-05-31 09:20:00'), +(5, 'user05@data.com', '2024-06-03', '2024-06-03 11:50:00'), +(6, 'user06@data.com', '2024-06-07', '2024-06-07 14:05:00'), +(7, 'user07@data.com', '2024-06-10', '2024-06-10 07:30:00'), +(8, 'user08@data.com', '2024-06-14', '2024-06-14 15:25:00'), +(9, 'user09@data.com', '2024-06-17', '2024-06-17 10:45:00'), +(10, 'user10@data.com', '2024-06-19', '2024-06-19 13:30:00'); +""" + insert_sql = """ insert into `default`.source_table ( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf new file mode 100644 index 00000000000..f6a5ceca2e0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of batch processing in seatunnel config +###### + +env { + parallelism = 3 + job.mode = "BATCH" + job.name = "clickhouse_to_clickhouse_with_parallel_read" +} + +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "enrollment_date" + partition_num = 3 + } +} + +sink { + Clickhouse { + plugin_input = "parallel_source_table" + host = "clickhouse:8123" + database = "default" + table = "parallel_sink_table" + username = "default" + password = "" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_datetime.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_datetime.conf new file mode 100644 index 00000000000..f71b9f7c203 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_datetime.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of batch processing in seatunnel config +###### + +env { + parallelism = 3 + job.mode = "BATCH" + job.name = "clickhouse_to_clickhouse_with_parallel_read" +} + +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "date" + partition_num = 3 + partition_lower_bound = "2024-05-20 08:30:00" + partition_upper_bound = "2024-06-19 13:30:00" + } +} + +sink { + Clickhouse { + plugin_input = "parallel_source_table" + host = "clickhouse:8123" + database = "default" + table = "parallel_sink_table" + username = "default" + password = "" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_number.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_number.conf new file mode 100644 index 00000000000..e5c86de9369 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_number.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of batch processing in seatunnel config +###### + +env { + parallelism = 3 + job.mode = "BATCH" + job.name = "clickhouse_to_clickhouse_with_parallel_read" +} + +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "id" + partition_num = 3 + partition_lower_bound = 1 + partition_upper_bound = 10 + } +} + +sink { + Clickhouse { + plugin_input = "parallel_source_table" + host = "clickhouse:8123" + database = "default" + table = "parallel_sink_table" + username = "default" + password = "" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_single.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_single.conf new file mode 100644 index 00000000000..42b1c448d23 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_single.conf @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of batch processing in seatunnel config +###### + +env { + parallelism = 3 + job.mode = "BATCH" + job.name = "clickhouse_to_clickhouse_with_parallel_read" +} + +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + } +} + +sink { + Clickhouse { + plugin_input = "parallel_source_table" + host = "clickhouse:8123" + database = "default" + table = "parallel_sink_table" + username = "default" + password = "" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_string.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_string.conf new file mode 100644 index 00000000000..1544c811935 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/parallel_read/clickhouse_to_clickhouse_with_parallel_read_string.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of batch processing in seatunnel config +###### + +env { + parallelism = 3 + job.mode = "BATCH" + job.name = "clickhouse_to_clickhouse_with_parallel_read" +} + +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from parallel_source_table" + username = "default" + password = "" + plugin_output = "parallel_source_table" + partition_column = "email" + partition_num = 3 + } +} + +sink { + Clickhouse { + plugin_input = "parallel_source_table" + host = "clickhouse:8123" + database = "default" + table = "parallel_sink_table" + username = "default" + password = "" + } +} From 7831b37ba3296289c5ea22e0497d12857411615d Mon Sep 17 00:00:00 2001 From: mrtisttt Date: Wed, 11 Jun 2025 03:23:44 +0800 Subject: [PATCH 2/4] [Feature][Connector-V2] Fix code style --- .../config/ClickhouseSourceConfig.java | 3 +- .../source/ClickHouseSourceSplit.java | 5 +- .../source/ClickhouseChunkSplitter.java | 56 +++++++----- .../clickhouse/source/ClickhouseSource.java | 2 +- .../ClickhouseSourceSplitEnumerator.java | 3 +- .../source/ClickhouseChunkSplitterTest.java | 90 +++++++++---------- .../seatunnel/clickhouse/ClickhouseIT.java | 24 +++-- 7 files changed, 98 insertions(+), 85 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java index 0ef7e16d4c8..6e6ac369162 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java @@ -17,10 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + import com.clickhouse.client.ClickHouseNode; import lombok.Builder; import lombok.Data; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import java.io.Serializable; import java.util.List; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java index ed05c5d5520..0d4a80deec1 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickHouseSourceSplit.java @@ -17,11 +17,12 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.ToString; -import org.apache.seatunnel.api.source.SourceSplit; -import org.apache.seatunnel.api.table.catalog.TablePath; @Data @ToString diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java index cd99f4fec9b..5b94590666a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java @@ -18,10 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; import com.clickhouse.client.*; -import com.clickhouse.client.data.ClickHouseDateTimeValue; -import com.clickhouse.client.data.ClickHouseDateValue; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -34,6 +30,12 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; +import org.apache.commons.lang3.tuple.Pair; + +import com.clickhouse.client.data.ClickHouseDateTimeValue; +import com.clickhouse.client.data.ClickHouseDateValue; +import lombok.extern.slf4j.Slf4j; + import java.io.Serializable; import java.math.BigDecimal; import java.time.LocalDate; @@ -42,30 +44,34 @@ import java.util.function.Function; import java.util.stream.Collectors; - /** - * Parallel reading shard splitting strategy, mainly divided into two categories according to the type of partition field: + * Parallel reading shard splitting strategy, mainly divided into two categories according to the + * type of partition field: * *

1. Numeric types * *

Numeric types include pure numeric types and date types: * *

(1) Pure numeric types - *

Calculate the partition size based on the lower and upper bounds, and split according to the number of partitions - * (the last partition may be smaller than the partition size). + * + *

Calculate the partition size based on the lower and upper bounds, and split according to the + * number of partitions (the last partition may be smaller than the partition size). * *

(2) Time types - *

Time types mainly include two categories: Date and DateTime. Regardless of the category, they will first be - * converted to their numerical values, and then the splitting algorithm is the same as that for pure numeric types. - * After splitting into partitions, if the field is of type Date, ClickHouse's toDate() function will be used to - * convert the partition values. If it is of type DateTime, the toDateTime64() function will be used instead. * - *

Regardless of whether it is a pure numeric type or a time type, if the lower or upper bound is not specified, - * the database will be requested to obtain the maximum and minimum values. + *

Time types mainly include two categories: Date and DateTime. Regardless of the category, they + * will first be converted to their numerical values, and then the splitting algorithm is the same + * as that for pure numeric types. After splitting into partitions, if the field is of type Date, + * ClickHouse's toDate() function will be used to convert the partition values. If it is of type + * DateTime, the toDateTime64() function will be used instead. + * + *

Regardless of whether it is a pure numeric type or a time type, if the lower or upper bound is + * not specified, the database will be requested to obtain the maximum and minimum values. * *

2. String types - *

For strings, specifying upper and lower bounds is invalid. The splitting algorithm will take the modulus of - * the partition field according to the number of partitions to split the data. + * + *

For strings, specifying upper and lower bounds is invalid. The splitting algorithm will take + * the modulus of the partition field according to the number of partitions to split the data. */ @Slf4j public class ClickhouseChunkSplitter { @@ -215,14 +221,21 @@ protected Pair queryMinMax( private Pair getPartitionBoundValue( ClickhouseSourceConfig sourceConfig, SeaTunnelDataType splitKeyType) { Function dateTimeTranslator = - value -> ClickHouseDateTimeValue.of(value, 3, - TimeZone.getTimeZone(sourceConfig.getServerTimeZone())).asBigDecimal(); + value -> + ClickHouseDateTimeValue.of( + value, + 3, + TimeZone.getTimeZone(sourceConfig.getServerTimeZone())) + .asBigDecimal(); Map> timeTranslatorMap = new HashMap>() { { // Clickhouse Type: Date - put(SqlType.DATE, value -> - ClickHouseDateValue.of(LocalDate.parse(value)).asBigDecimal()); + put( + SqlType.DATE, + value -> + ClickHouseDateValue.of(LocalDate.parse(value)) + .asBigDecimal()); // Clickhouse Type: DateTime put(SqlType.TIME, dateTimeTranslator); put(SqlType.TIMESTAMP, dateTimeTranslator); @@ -248,7 +261,8 @@ private Pair getPartitionBoundValue( } catch (Exception e) { throw new ClickhouseConnectorException( CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, - "Translate partition bound value failed.", e); + "Translate partition bound value failed.", + e); } return Pair.of(partitionStart, partitionEnd); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index de999ef216d..0804f02c9dc 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -36,7 +36,7 @@ public class ClickhouseSource implements SeaTunnelSource, - SupportParallelism { + SupportParallelism { private final ClickhouseSourceConfig sourceConfig; private final List servers; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index 4c104d19412..98806ea94ca 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; -import lombok.extern.slf4j.Slf4j; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.common.exception.CommonErrorCode; @@ -25,6 +24,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.util.*; import java.util.stream.Collectors; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java index 3d8ccd4b852..b3e8906072f 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java @@ -17,16 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; -import com.google.common.collect.ImmutableList; -import org.apache.commons.lang3.tuple.Pair; import org.apache.seatunnel.api.table.catalog.*; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; + +import org.apache.commons.lang3.tuple.Pair; + import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import com.google.common.collect.ImmutableList; + import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; @@ -93,11 +96,8 @@ public void testSplitWithPartitionColumnAndWithBoundNumberColumn() throws Except @Test public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exception { // 1 partition test - ClickhouseSourceConfig sourceConfig = getSourceConfig( - "enrollment_date", - "2025-05-01", - "2025-06-08", - 1); + ClickhouseSourceConfig sourceConfig = + getSourceConfig("enrollment_date", "2025-05-01", "2025-06-08", 1); List splits = splitter.generateSplits(sourceConfig, catalogTable); Assert.assertEquals(1, splits.size()); @@ -114,18 +114,15 @@ public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exceptio Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); // 3 partition test - sourceConfig = getSourceConfig( - "enrollment_date", - "2025-05-01", - "2025-06-06", - 3); + sourceConfig = getSourceConfig("enrollment_date", "2025-05-01", "2025-06-06", 3); splits = splitter.generateSplits(sourceConfig, catalogTable); Assert.assertEquals(3, splits.size()); - List> boundValues = ImmutableList.of( - Pair.of(LocalDate.of(2025, 5, 1), LocalDate.of(2025, 5, 13)), - Pair.of(LocalDate.of(2025, 5, 14), LocalDate.of(2025, 5, 26)), - Pair.of(LocalDate.of(2025, 5, 27), LocalDate.of(2025, 6, 6))); + List> boundValues = + ImmutableList.of( + Pair.of(LocalDate.of(2025, 5, 1), LocalDate.of(2025, 5, 13)), + Pair.of(LocalDate.of(2025, 5, 14), LocalDate.of(2025, 5, 26)), + Pair.of(LocalDate.of(2025, 5, 27), LocalDate.of(2025, 6, 6))); for (int i = 0; i < splits.size(); i++) { Pair boundValue = boundValues.get(i); expectedQuery = @@ -142,11 +139,8 @@ public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exceptio @Test public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exception { // 1 partition test - ClickhouseSourceConfig sourceConfig = getSourceConfig( - "created_at", - "2025-05-01 12:30:00", - "2025-06-06 15:30:00", - 1); + ClickhouseSourceConfig sourceConfig = + getSourceConfig("created_at", "2025-05-01 12:30:00", "2025-06-06 15:30:00", 1); List splits = splitter.generateSplits(sourceConfig, catalogTable); Assert.assertEquals(1, splits.size()); @@ -162,24 +156,22 @@ public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exce Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); // 3 partition test - sourceConfig = getSourceConfig( - "created_at", - "2025-05-01 12:30:00", - "2025-06-06 15:30:00", - 3); + sourceConfig = + getSourceConfig("created_at", "2025-05-01 12:30:00", "2025-06-06 15:30:00", 3); splits = splitter.generateSplits(sourceConfig, catalogTable); Assert.assertEquals(3, splits.size()); - List> boundValues = ImmutableList.of( - Pair.of( - LocalDateTime.of(2025, 5, 1, 12, 30, 0), - LocalDateTime.of(2025, 5, 13, 13, 30, 0)), - Pair.of( - LocalDateTime.of(2025, 5, 13, 13, 30, 1), - LocalDateTime.of(2025, 5, 25, 14, 30, 1)), - Pair.of( - LocalDateTime.of(2025, 5, 25, 14, 30, 2), - LocalDateTime.of(2025, 6, 6, 15, 30, 0))); + List> boundValues = + ImmutableList.of( + Pair.of( + LocalDateTime.of(2025, 5, 1, 12, 30, 0), + LocalDateTime.of(2025, 5, 13, 13, 30, 0)), + Pair.of( + LocalDateTime.of(2025, 5, 13, 13, 30, 1), + LocalDateTime.of(2025, 5, 25, 14, 30, 1)), + Pair.of( + LocalDateTime.of(2025, 5, 25, 14, 30, 2), + LocalDateTime.of(2025, 6, 6, 15, 30, 0))); for (int i = 0; i < splits.size(); i++) { Pair boundValue = boundValues.get(i); expectedQuery = @@ -196,30 +188,29 @@ public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exce @Test public void testSplitWithPartitionColumnAndWithBoundStringColumn() throws Exception { // 1 partition test - ClickhouseSourceConfig sourceConfig = getSourceConfig( - "name", null, null, 1); + ClickhouseSourceConfig sourceConfig = getSourceConfig("name", null, null, 1); List splits = splitter.generateSplits(sourceConfig, catalogTable); Assert.assertEquals(1, splits.size()); - String expectedQuery = String.format( - "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 1 = 0", - QUERY_SQL); + String expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 1 = 0", + QUERY_SQL); Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); // 3 partition test - sourceConfig = getSourceConfig( - "name", null, null, 3); + sourceConfig = getSourceConfig("name", null, null, 3); splits = splitter.generateSplits(sourceConfig, catalogTable); Assert.assertEquals(3, splits.size()); List boundValues = ImmutableList.of(0, 1, 2); for (int i = 0; i < splits.size(); i++) { - expectedQuery = String.format( - "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 3 = %s", - QUERY_SQL, - boundValues.get(i)); + expectedQuery = + String.format( + "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 3 = %s", + QUERY_SQL, boundValues.get(i)); String splitQuery = splits.get(i).getSplitQuery(); Assert.assertEquals(expectedQuery, splitQuery); } @@ -234,9 +225,8 @@ public void testSplitWithPartitionColumnAndWithoutBound() throws Exception { ClickhouseChunkSplitter spySplitter = Mockito.spy(splitter); ClickhouseSourceConfig sourceConfig = getSourceConfig("id", null, null, 3); // mock the queryMinMax method - Pair queryBoundValues = Pair.of( - new BigDecimal(-3), - new BigDecimal(31)); + Pair queryBoundValues = + Pair.of(new BigDecimal(-3), new BigDecimal(31)); Mockito.doReturn(queryBoundValues).when(spySplitter).queryMinMax(sourceConfig, "id"); List splits = spySplitter.generateSplits(sourceConfig, catalogTable); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 09df72e46d0..19bb9fbd3f2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -218,35 +218,40 @@ public void testClickHouseWithParallelRead(TestContainer container) throws Excep // testClickHouseWithParallelReadNumberCol assertParallelTableSetupStatus(); Container.ExecResult execResult = - container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_number.conf"); + container.executeJob( + "/parallel_read/clickhouse_to_clickhouse_with_parallel_read_number.conf"); assertParallelTableSinkStatus(execResult); clearTable("parallel_sink_table"); // testClickHouseWithParallelReadDateCol(Intentionally remove bound value in config file) assertParallelTableSetupStatus(); execResult = - container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf"); + container.executeJob( + "/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf"); assertParallelTableSinkStatus(execResult); clearTable("parallel_sink_table"); // testClickHouseWithParallelReadDateTimeCol assertParallelTableSetupStatus(); execResult = - container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf"); + container.executeJob( + "/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf"); assertParallelTableSinkStatus(execResult); clearTable("parallel_sink_table"); // testClickHouseWithParallelReadStringCol assertParallelTableSetupStatus(); execResult = - container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_string.conf"); + container.executeJob( + "/parallel_read/clickhouse_to_clickhouse_with_parallel_read_string.conf"); assertParallelTableSinkStatus(execResult); clearTable("parallel_sink_table"); // other necessary test assertParallelTableSetupStatus(); execResult = - container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_single.conf"); + container.executeJob( + "/parallel_read/clickhouse_to_clickhouse_with_parallel_read_single.conf"); assertParallelTableSinkStatus(execResult); clearTable("parallel_sink_table"); } @@ -288,9 +293,9 @@ private void initializeClickhouseTable() { statement.execute(CONFIG.getString(SOURCE_TABLE)); statement.execute(CONFIG.getString(SINK_TABLE)); // for other usage tables - List tables = Stream - .concat(MULTI_SINK_TABLES.stream(), PARALLEL_TABLES.stream()) - .collect(Collectors.toList()); + List tables = + Stream.concat(MULTI_SINK_TABLES.stream(), PARALLEL_TABLES.stream()) + .collect(Collectors.toList()); for (String tableName : tables) { statement.execute(CONFIG.getString(tableName)); } @@ -377,6 +382,7 @@ private void dropTable(String tableName) { throw new RuntimeException("Drop table failed!", e); } } + private void batchInsertData() { this.batchInsertCommonUsageData(); this.batchInsertParallelReadUsageData(); @@ -384,7 +390,7 @@ private void batchInsertData() { private void batchInsertParallelReadUsageData() { String sql = CONFIG.getString("insert_sql_for_parallel_table"); - try(Statement statement = connection.createStatement();) { + try (Statement statement = connection.createStatement(); ) { statement.execute(sql); } catch (SQLException e) { throw new RuntimeException("Batch insert data failed!", e); From ae56c86704b936a70fa3fbd8dd30a23eb0b985af Mon Sep 17 00:00:00 2001 From: mrtisttt Date: Wed, 11 Jun 2025 03:41:18 +0800 Subject: [PATCH 3/4] [Feature][Connector-V2] Fix code style --- .../source/ClickhouseChunkSplitter.java | 17 +++++-- .../ClickhouseSourceSplitEnumerator.java | 7 ++- .../source/ClickhouseChunkSplitterTest.java | 50 ++++++++++--------- 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java index 5b94590666a..18fe3128e09 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitter.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; -import com.clickhouse.client.*; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -32,6 +31,12 @@ import org.apache.commons.lang3.tuple.Pair; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseRecord; +import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.data.ClickHouseDateTimeValue; import com.clickhouse.client.data.ClickHouseDateValue; import lombok.extern.slf4j.Slf4j; @@ -39,7 +44,13 @@ import java.io.Serializable; import java.math.BigDecimal; import java.time.LocalDate; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.stream.Collectors; @@ -201,7 +212,7 @@ protected Pair queryMinMax( List nodes = sourceConfig.getNodes(); ClickHouseNode currentServer = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); - ClickHouseResponse response = + ClickHouseResponse response = client.connect(currentServer) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query(sqlQuery) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index 98806ea94ca..84616583b71 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -27,7 +27,12 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; @Slf4j diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java index b3e8906072f..d7970eeb642 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java @@ -17,15 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; -import org.apache.seatunnel.api.table.catalog.*; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSourceConfig; import org.apache.commons.lang3.tuple.Pair; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import com.google.common.collect.ImmutableList; @@ -55,10 +59,10 @@ public void testSplitWithoutPartitionColumn() throws Exception { Collection splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(1, splits.size()); + Assertions.assertEquals(1, splits.size()); ClickHouseSourceSplit split = splits.iterator().next(); - Assert.assertEquals("select * from student", split.getSplitQuery()); + Assertions.assertEquals("select * from student", split.getSplitQuery()); } @Test @@ -67,18 +71,18 @@ public void testSplitWithPartitionColumnAndWithBoundNumberColumn() throws Except ClickhouseSourceConfig sourceConfig = getSourceConfig("id", "1", "30", 1); List splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(1, splits.size()); + Assertions.assertEquals(1, splits.size()); String expectedQuery = String.format( "SELECT * FROM (%s) st_clickhouse_splitter WHERE id BETWEEN %s and %s", QUERY_SQL, 1, 30); - Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + Assertions.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); // 3 partitions test sourceConfig = getSourceConfig("id", "1", "30", 3); splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(3, splits.size()); + Assertions.assertEquals(3, splits.size()); List> boundValues = ImmutableList.of(Pair.of(1, 10), Pair.of(11, 20), Pair.of(21, 30)); @@ -89,7 +93,7 @@ public void testSplitWithPartitionColumnAndWithBoundNumberColumn() throws Except "SELECT * FROM (%s) st_clickhouse_splitter WHERE id BETWEEN %s and %s", QUERY_SQL, boundValue.getLeft(), boundValue.getRight()); String splitQuery = splits.get(i).getSplitQuery(); - Assert.assertEquals(expectedQuery, splitQuery); + Assertions.assertEquals(expectedQuery, splitQuery); } } @@ -100,7 +104,7 @@ public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exceptio getSourceConfig("enrollment_date", "2025-05-01", "2025-06-08", 1); List splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(1, splits.size()); + Assertions.assertEquals(1, splits.size()); LocalDate baseDate = LocalDate.of(1970, 1, 1); LocalDate lowerBoundDate = LocalDate.of(2025, 5, 1); @@ -111,12 +115,12 @@ public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exceptio QUERY_SQL, ChronoUnit.DAYS.between(baseDate, lowerBoundDate), ChronoUnit.DAYS.between(baseDate, upperBoundDate)); - Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + Assertions.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); // 3 partition test sourceConfig = getSourceConfig("enrollment_date", "2025-05-01", "2025-06-06", 3); splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(3, splits.size()); + Assertions.assertEquals(3, splits.size()); List> boundValues = ImmutableList.of( @@ -132,7 +136,7 @@ public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exceptio ChronoUnit.DAYS.between(baseDate, boundValue.getLeft()), ChronoUnit.DAYS.between(baseDate, boundValue.getRight())); String splitQuery = splits.get(i).getSplitQuery(); - Assert.assertEquals(expectedQuery, splitQuery); + Assertions.assertEquals(expectedQuery, splitQuery); } } @@ -143,7 +147,7 @@ public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exce getSourceConfig("created_at", "2025-05-01 12:30:00", "2025-06-06 15:30:00", 1); List splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(1, splits.size()); + Assertions.assertEquals(1, splits.size()); LocalDateTime lowerBoundDateTime = LocalDateTime.of(2025, 5, 1, 12, 30, 0); LocalDateTime upperBoundDateTime = LocalDateTime.of(2025, 6, 6, 15, 30, 0); @@ -153,13 +157,13 @@ public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exce QUERY_SQL, lowerBoundDateTime.atZone(ZoneId.systemDefault()).toEpochSecond(), upperBoundDateTime.atZone(ZoneId.systemDefault()).toEpochSecond()); - Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + Assertions.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); // 3 partition test sourceConfig = getSourceConfig("created_at", "2025-05-01 12:30:00", "2025-06-06 15:30:00", 3); splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(3, splits.size()); + Assertions.assertEquals(3, splits.size()); List> boundValues = ImmutableList.of( @@ -181,7 +185,7 @@ public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exce boundValue.getLeft().atZone(ZoneId.systemDefault()).toEpochSecond(), boundValue.getRight().atZone(ZoneId.systemDefault()).toEpochSecond()); String splitQuery = splits.get(i).getSplitQuery(); - Assert.assertEquals(expectedQuery, splitQuery); + Assertions.assertEquals(expectedQuery, splitQuery); } } @@ -191,19 +195,19 @@ public void testSplitWithPartitionColumnAndWithBoundStringColumn() throws Except ClickhouseSourceConfig sourceConfig = getSourceConfig("name", null, null, 1); List splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(1, splits.size()); + Assertions.assertEquals(1, splits.size()); String expectedQuery = String.format( "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 1 = 0", QUERY_SQL); - Assert.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); + Assertions.assertEquals(expectedQuery, splits.get(0).getSplitQuery()); // 3 partition test sourceConfig = getSourceConfig("name", null, null, 3); splits = splitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(3, splits.size()); + Assertions.assertEquals(3, splits.size()); List boundValues = ImmutableList.of(0, 1, 2); for (int i = 0; i < splits.size(); i++) { @@ -212,7 +216,7 @@ public void testSplitWithPartitionColumnAndWithBoundStringColumn() throws Except "SELECT * FROM (%s) st_clickhouse_splitter WHERE xxHash32(coalesce(`name`, '')) %% 3 = %s", QUERY_SQL, boundValues.get(i)); String splitQuery = splits.get(i).getSplitQuery(); - Assert.assertEquals(expectedQuery, splitQuery); + Assertions.assertEquals(expectedQuery, splitQuery); } } @@ -230,7 +234,7 @@ public void testSplitWithPartitionColumnAndWithoutBound() throws Exception { Mockito.doReturn(queryBoundValues).when(spySplitter).queryMinMax(sourceConfig, "id"); List splits = spySplitter.generateSplits(sourceConfig, catalogTable); - Assert.assertEquals(3, splits.size()); + Assertions.assertEquals(3, splits.size()); List> boundValues = ImmutableList.of(Pair.of(-3, 8), Pair.of(9, 20), Pair.of(21, 31)); @@ -241,7 +245,7 @@ public void testSplitWithPartitionColumnAndWithoutBound() throws Exception { "SELECT * FROM (%s) st_clickhouse_splitter WHERE id BETWEEN %s and %s", QUERY_SQL, boundValue.getLeft(), boundValue.getRight()); String splitQuery = splits.get(i).getSplitQuery(); - Assert.assertEquals(expectedQuery, splitQuery); + Assertions.assertEquals(expectedQuery, splitQuery); } } From a4da59bbcf405800268da0fad192f3134a3bca34 Mon Sep 17 00:00:00 2001 From: mrtisttt Date: Wed, 11 Jun 2025 04:02:56 +0800 Subject: [PATCH 4/4] [Feature][Connector-V2] Fix code specification --- .../clickhouse/state/ClickhouseSourceState.java | 4 +++- .../source/ClickhouseChunkSplitterTest.java | 13 ++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java index 3050436e7ee..82c862174b5 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java @@ -19,4 +19,6 @@ import java.io.Serializable; -public class ClickhouseSourceState implements Serializable {} +public class ClickhouseSourceState implements Serializable { + private static final long serialVersionUID = 3721458963214587L; +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java index d7970eeb642..5e75d775b5f 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseChunkSplitterTest.java @@ -32,14 +32,13 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import com.google.common.collect.ImmutableList; - import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -85,7 +84,7 @@ public void testSplitWithPartitionColumnAndWithBoundNumberColumn() throws Except Assertions.assertEquals(3, splits.size()); List> boundValues = - ImmutableList.of(Pair.of(1, 10), Pair.of(11, 20), Pair.of(21, 30)); + Arrays.asList(Pair.of(1, 10), Pair.of(11, 20), Pair.of(21, 30)); for (int i = 0; i < splits.size(); i++) { Pair boundValue = boundValues.get(i); expectedQuery = @@ -123,7 +122,7 @@ public void testSplitWithPartitionColumnAndWithBoundDateColumn() throws Exceptio Assertions.assertEquals(3, splits.size()); List> boundValues = - ImmutableList.of( + Arrays.asList( Pair.of(LocalDate.of(2025, 5, 1), LocalDate.of(2025, 5, 13)), Pair.of(LocalDate.of(2025, 5, 14), LocalDate.of(2025, 5, 26)), Pair.of(LocalDate.of(2025, 5, 27), LocalDate.of(2025, 6, 6))); @@ -166,7 +165,7 @@ public void testSplitWithPartitionColumnAndWithBoundDateTimeColumn() throws Exce Assertions.assertEquals(3, splits.size()); List> boundValues = - ImmutableList.of( + Arrays.asList( Pair.of( LocalDateTime.of(2025, 5, 1, 12, 30, 0), LocalDateTime.of(2025, 5, 13, 13, 30, 0)), @@ -209,7 +208,7 @@ public void testSplitWithPartitionColumnAndWithBoundStringColumn() throws Except splits = splitter.generateSplits(sourceConfig, catalogTable); Assertions.assertEquals(3, splits.size()); - List boundValues = ImmutableList.of(0, 1, 2); + List boundValues = Arrays.asList(0, 1, 2); for (int i = 0; i < splits.size(); i++) { expectedQuery = String.format( @@ -237,7 +236,7 @@ public void testSplitWithPartitionColumnAndWithoutBound() throws Exception { Assertions.assertEquals(3, splits.size()); List> boundValues = - ImmutableList.of(Pair.of(-3, 8), Pair.of(9, 20), Pair.of(21, 31)); + Arrays.asList(Pair.of(-3, 8), Pair.of(9, 20), Pair.of(21, 31)); for (int i = 0; i < splits.size(); i++) { Pair boundValue = boundValues.get(i); String expectedQuery =