Skip to content

[Feature][Connector-V2] ClickHouse source support parallelism #9421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 115 additions & 4 deletions docs/en/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import ChangeLog from '../changelog/connector-clickhouse.md';
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [x] [support user-defined split](../../concept/connector-v2-features.md)

> supports query SQL and can achieve projection effect.

Expand Down Expand Up @@ -60,6 +60,10 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| password | String | Yes | - | `ClickHouse` user password. |
| clickhouse.config | Map | No | - | In addition to the above mandatory parameters that must be specified by `clickhouse-jdbc` , users can also specify multiple optional parameters, which cover all the [parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration) provided by `clickhouse-jdbc`. |
| server_time_zone | String | No | ZoneId.systemDefault() | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. |
| partition_column | String | No | | When performing parallel reads on a source table, the split field currently supports numeric, date, time, and string types. If left unspecified, data reads will default to using only one split, meaning parallel reads will not be enabled. In this case, all configurations related to parallel reads will not take effect. |
| partition_num | Integer | No | 10 | The number of splits when performing parallel reads on a source table. |
| partition_lower_bound | String | No | | The lower bound value for splitting during parallel reads. Depending on the splitting field's data type, enter the corresponding value. The splitting algorithm uses this as the lower limit of the splitting range. If `partition_upper_bound` is not specified, this parameter will be ignored. |
| partition_upper_bound | String | No | | The upper bound value for splitting during parallel reads. Depending on the sharding field's data type, enter the corresponding value. The splitting algorithm uses this as the upper limit of the splitting range. If `partition_lower_bound` is not specified, this parameter will be ignored. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |

## How to Create a Clickhouse Data Synchronization Jobs
Expand Down Expand Up @@ -96,10 +100,117 @@ sink {
}
```

### Tips

> Tips
>
> 1.[SeaTunnel Deployment Document](../../start-v2/locally/deployment.md).

## Analysis of Key Features

### Parallel Reading

#### Splitting Algorithm

Parallel reading shard splitting strategy, mainly divided into two categories according to the type of partition field:

**1.Numeric types**

Numeric types include pure numeric types and date types:

(1) Pure numeric types

Calculate the partition size based on the lower and upper bounds, and split according to the number of partitions (the last partition may be smaller than the partition size).

(2) Time types

Time types mainly include two categories: Date and DateTime. Regardless of the category, they will first be converted to their numerical values, and then the splitting algorithm is the same as that for pure numeric types. After splitting into partitions, if the field is of type Date, ClickHouse's toDate() function will be used to convert the partition values. If it is of type DateTime, the toDateTime64() function will be used instead.

> Regardless of whether it is a pure numeric type or a time type, if the lower or upper bound is not specified, the database will be requested to obtain the maximum and minimum values.

**2.String types**
For strings, specifying upper and lower bounds is invalid. The splitting algorithm will take the modulus of the partition field according to the number of partitions to split the data.



After splitting the data using the split algorithm described above, the corresponding splits are evenly distributed to Readers with the specified parallelism. This enables parallel reads from the ClickHouse data table, significantly enhancing the efficiency of data retrieval from ClickHouse.

#### Configuration Examples

**1.Pure numeric types**

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "id"
partition_num = 3
# partition_lower_bound = 1
# partition_upper_bound = 10
}
}
```

**2.Time types**

Date type:

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "enrollment_date"
partition_num = 3
# partition_lower_bound = "2024-05-20"
# partition_upper_bound = "2024-06-20"
}
}
```

DateTime type:

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "date"
partition_num = 3
# partition_lower_bound = "2024-05-20 08:30:00"
# partition_upper_bound = "2024-06-19 13:30:00"
}
}
```

**3.String types**

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "email"
partition_num = 3
}
}
```

## Changelog

<ChangeLog />
120 changes: 116 additions & 4 deletions docs/zh/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import ChangeLog from '../changelog/connector-clickhouse.md';
- [ ] [流处理](../../concept/connector-v2-features.md)
- [ ] [精确一次](../../concept/connector-v2-features.md)
- [x] [列映射](../../concept/connector-v2-features.md)
- [ ] [并行度](../../concept/connector-v2-features.md)
- [ ] [支持用户自定义拆分](../../concept/connector-v2-features.md)
- [x] [并行度](../../concept/connector-v2-features.md)
- [x] [支持用户自定义拆分](../../concept/connector-v2-features.md)

> 支持查询SQL,可以实现投影效果。

Expand Down Expand Up @@ -59,6 +59,10 @@ import ChangeLog from '../changelog/connector-clickhouse.md';
| password | String | 是 | - | `ClickHouse` user 用户密码. |
| clickhouse.config | Map | 否 | - | 除了上述必须由 `clickhouse-jdbc` 指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了 `clickhouse-jdbc` 提供的所有[参数](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration). |
| server_time_zone | String | 否 | ZoneId.systemDefault() | 数据库服务中的会话时区。如果未设置,则使用ZoneId.systemDefault()设置服务时区. |
| partition_column | String | 否 | | 并行读取数据表时的分片字段,目前支持数字、日期、时间和字符串类型,如果不填写,则数据读取默认只有1个分片,即不进行并行读取,此时跟并行读取的相关配置都不会生效 |
| partition_num | Integer | 否 | 10 | 并行读取数据表时的分片数量 |
| partition_lower_bound | String | 否 | | 并行读取进行分片时的下限值,根据分片字段填入对应数据类型,分片算法会以此作为分片的下限范围,如果partition_upper_bound没有填写,则不生效 |
| partition_upper_bound | String | 否 | | 并行读取进行分片时的上限值,根据分片字段填入对应数据类型,分片算法会以此作为分片的上限范围,如果partition_lower_bound没有填写,则不生效 |
| common-options | | 否 | - | 源插件常用参数,详见 [源通用选项](../source-common-options.md). |

## 如何创建Clickhouse数据同步作业
Expand Down Expand Up @@ -96,10 +100,118 @@ sink {
}
```

### 小提示

> 小提示
>
> 1.[SeaTunnel 部署文档](../../start-v2/locally/deployment.md).

## 关键特性解析

### 并行读取

#### 分片算法

并行读取分片切分策略,根据分区字段的类型不同,主要分为两大类:

**1.数字类**

数字类又包含纯数字类和日期类:

(1)纯数字类

基于下限和上限,计算出分区大小,并根据分区数进行切分(最后一个分区可能会小于分区大小)。

(2)时间类

时间类在主要是包括Date和DateTime两大类,但不管哪一类,都会先转换为其数值大小,然后切分算法跟纯数字类一样,切分出分区之后,如果字段是Date类型,则会使用ClickHouse的`toDate()`函数将分区数值进行转换,而如果是DateTime类型,则会使用ClickHouse的`toDateTime64()`函数将分区数值进行转换

> 无论是纯数值类型还是时间类型,如果未指定下限或上限,将会请求数据库获取最大值和最小值。

**2.字符串类**

对于字符串,指定上下限无效,切分算法会对分区字段根据分区数进行取模,以切分数据。



在根据以上分片算法对数据进行分片之后,相应分片会平均发送给对应并行度的Reader,进而实现并行读取ClickHouse数据表,从而极大提高ClickHouse数据读取效率。

#### 配置案例

**1.纯数字类**

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "id"
partition_num = 3
# partition_lower_bound = 1
# partition_upper_bound = 10
}
}
```

**2.时间类**

Date类:

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "enrollment_date"
partition_num = 3
# partition_lower_bound = "2024-05-20"
# partition_upper_bound = "2024-06-20"
}
}
```

DateTime类:

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "date"
partition_num = 3
# partition_lower_bound = "2024-05-20 08:30:00"
# partition_upper_bound = "2024-06-19 13:30:00"
}
}
```

**3.字符串类**

```
source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
sql = "select * from parallel_source_table"
username = "default"
password = ""
plugin_output = "parallel_source_table"
partition_column = "email"
partition_num = 3
}
}
```

## 变更日志

<ChangeLog />
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import com.clickhouse.client.ClickHouseNode;
import lombok.Builder;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

@Data
@Builder(builderClassName = "Builder")
public class ClickhouseSourceConfig implements Serializable {

private String serverTimeZone;
private List<ClickHouseNode> nodes;
private String sql;
private String partitionColumn;
private String partitionUpperBound;
private String partitionLowerBound;
private Integer partitionNum;

public static ClickhouseSourceConfig of(ReadonlyConfig config) {
Builder builder = ClickhouseSourceConfig.builder();

builder.serverTimeZone(config.get(ClickhouseBaseOptions.SERVER_TIME_ZONE));
builder.sql(config.get(ClickhouseSourceOptions.SQL));
builder.partitionColumn(config.get(ClickhouseSourceOptions.PARTITION_COLUMN));
builder.partitionUpperBound(config.get(ClickhouseSourceOptions.PARTITION_UPPER_BOUND));
builder.partitionLowerBound(config.get(ClickhouseSourceOptions.PARTITION_LOWER_BOUND));
builder.partitionNum(config.get(ClickhouseSourceOptions.PARTITION_NUM));

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,35 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class ClickhouseSourceOptions {
public interface ClickhouseSourceOptions {

public static final Option<String> SQL =
Option<String> SQL =
Options.key("sql")
.stringType()
.noDefaultValue()
.withDescription("Clickhouse sql used to query data");

Option<String> PARTITION_COLUMN =
Options.key("partition_column")
.stringType()
.noDefaultValue()
.withDescription("partition column");

Option<Integer> PARTITION_NUM =
Options.key("partition_num")
.intType()
.defaultValue(10)
.withDescription("partition num");

Option<String> PARTITION_LOWER_BOUND =
Options.key("partition_lower_bound")
.stringType()
.noDefaultValue()
.withDescription("partition lower bound");

Option<String> PARTITION_UPPER_BOUND =
Options.key("partition_upper_bound")
.stringType()
.noDefaultValue()
.withDescription("partition upper bound");
}
Loading