-
Notifications
You must be signed in to change notification settings - Fork 2k
[Improve][connector-clickhouse] Clickhouse support parallelism reading schema #9446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for parallel schema reading in the ClickHouse connector by leveraging the table part files from the system.parts table. Key changes include:
- New configuration options (e.g., partition_list, filter_query, batch_size) and test cases to support parallel reading.
- Updates to the core proxy, splitter, enumerator, source reader, and associated state management for splitting and reading parts concurrently.
- Documentation updates explaining the new parallel reader features.
Reviewed Changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
seatunnel-e2e/connector-clickhouse-e2e/.../clickhouse_with_parallelism_read.conf | Added test configuration for parallel read demonstration |
ClickhouseIT.java | Added new test methods and constants to verify parallel reading functionality |
TablePartSplitterTest.java | Introduced tests for generating splits, including duplicate parts handling |
ClickhouseValueReaderTest.java | Added tests to validate various batch reading scenarios |
ClickhouseProxy.java | Implemented methods to retrieve part lists and query data per part |
ClickhouseSourceState.java | Updated state object to include pending splits |
TablePartSplitter.java | Created new splitting logic for ClickHouse parts |
ClickhouseSourceSplitEnumerator.java | Added new split enumerator to support parallel splits assignment |
ClickhouseSourceSplit.java | Defined a split abstraction based on ClickHouse parts |
ClickhouseValueReader.java | Modified value reader to iteratively process splits and update part offsets |
ClickhouseSourceTable.java | Updated source table configuration to include new options |
ClickhouseSourceReader.java | Refactored source reader to integrate parallelism mode with split queue management |
ClickhouseSourceFactory.java | Enhanced factory to build source tables and incorporate new parallelism parameters |
ClickhouseSource.java | Updated the connector interface to support parallel reading with new enumerator and reader |
ClickhousePart.java | Introduced Comparable interface implementation (stubbed in current diff) |
ClickhouseTable.java | Added getter for local database name |
ClickhouseConnectorErrorCode.java | Added new error codes for part retrieval and query issues |
ClickhouseSourceOptions.java | Defined new options: part_size, partition_list, batch_size, and filter_query |
ClickhouseBaseOptions.java | Added table option to support table name configuration |
docs/en/connector-v2/source/Clickhouse.md | Updated documentation with instructions and tips for parallel reading |
Comments suppressed due to low confidence (1)
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java:77
- The compareTo method always returns 0, which effectively treats all instances as equal. Consider implementing a proper comparison (for example, based on the part name) or removing Comparable if natural ordering is not intended.
public int compareTo(ClickhousePart o) { return 0; }
"select name from system.parts where database = '%s' and table = '%s'", | ||
database, table); | ||
|
||
if (partitionList != null && !partitionList.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SQL query in getPartList is built by directly concatenating the partition list values. Consider using a parameterized query or properly escaping input values to mitigate the risk of SQL injection.
Copilot uses AI. Check for mistakes.
| username | String | Yes | - | `ClickHouse` user username. | | ||
| password | String | Yes | - | `ClickHouse` user password. | | ||
| database | String | NO | - | The `ClickHouse` database. | | ||
| table | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| table | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` | | |
| table_path | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` | |
Same as JDBC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the table_path
parameter is used instead, should the database
parameter also be removed? Is it uniformly represented by the table_path
parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@@ -211,6 +213,17 @@ public void testClickHouseWithMultiTableSink(TestContainer container) throws Exc | |||
} | |||
} | |||
|
|||
@TestTemplate | |||
public void testClickhouseWithParallelismRead(TestContainer testContainer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add test case to verify filter_query and partition_list work properly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I will add more test cases.
|
||
String sql = | ||
String.format( | ||
"select * from %s.%s where %s limit %d, %d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this implementation because for a single part, 'limit m,n' can guarantee the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is designed to read parts in batches to avoid large amounts of data when reading in parallel. Each ClickhousePart
object has an offset
attribute to record the offset of the current part that has been read, thereby ensuring the order of batch reading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading the clickhouse documentation, I found that clickhouse supports a kind of LIMIT... WITH TIES
way, this can ensure that the data with the same value in the Order By
field will be queried in the same batch. Meanwhile, the Order By field of the table is used to define the sorting key when query part. Can this solution solve the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good.
…er, add sql parallelism read strategy and fix other problem.
I have made the following updates:
Thanks for helping with the review! |
@@ -40,6 +40,13 @@ public class ClickhouseBaseOptions { | |||
.noDefaultValue() | |||
.withDescription("Clickhouse database name"); | |||
|
|||
/** Clickhouse database name */ | |||
public static final Option<String> TABLE = | |||
Options.key("table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Options.key("table") | |
Options.key("table_path") |
Purpose of this pull request
Clickhouse support parallelism reading schema.
related pr #9421
The Clickhouse source connector supports parallel reading of data. For query table mode, the parallel reading is implemented based on the part file of table, which is obtained from the system.parts table.
The
partition_list
andfilter_query
parameter is used to filter data.The
batch_size
parameter is used to control the amount of data read each time to avoid OOM exception.Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide