Skip to content

[Improve][connector-clickhouse] Clickhouse support parallelism reading schema #9446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from

Conversation

JeremyXin
Copy link
Contributor

Purpose of this pull request

Clickhouse support parallelism reading schema.
related pr #9421

The Clickhouse source connector supports parallel reading of data. For query table mode, the parallel reading is implemented based on the part file of table, which is obtained from the system.parts table.
The partition_list and filter_query parameter is used to filter data.
The batch_size parameter is used to control the amount of data read each time to avoid OOM exception.

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

Sorry, something went wrong.

JeremyXin added 2 commits June 16, 2025 20:18
@nielifeng nielifeng requested a review from Copilot June 17, 2025 01:56
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for parallel schema reading in the ClickHouse connector by leveraging the table part files from the system.parts table. Key changes include:

  • New configuration options (e.g., partition_list, filter_query, batch_size) and test cases to support parallel reading.
  • Updates to the core proxy, splitter, enumerator, source reader, and associated state management for splitting and reading parts concurrently.
  • Documentation updates explaining the new parallel reader features.

Reviewed Changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
seatunnel-e2e/connector-clickhouse-e2e/.../clickhouse_with_parallelism_read.conf Added test configuration for parallel read demonstration
ClickhouseIT.java Added new test methods and constants to verify parallel reading functionality
TablePartSplitterTest.java Introduced tests for generating splits, including duplicate parts handling
ClickhouseValueReaderTest.java Added tests to validate various batch reading scenarios
ClickhouseProxy.java Implemented methods to retrieve part lists and query data per part
ClickhouseSourceState.java Updated state object to include pending splits
TablePartSplitter.java Created new splitting logic for ClickHouse parts
ClickhouseSourceSplitEnumerator.java Added new split enumerator to support parallel splits assignment
ClickhouseSourceSplit.java Defined a split abstraction based on ClickHouse parts
ClickhouseValueReader.java Modified value reader to iteratively process splits and update part offsets
ClickhouseSourceTable.java Updated source table configuration to include new options
ClickhouseSourceReader.java Refactored source reader to integrate parallelism mode with split queue management
ClickhouseSourceFactory.java Enhanced factory to build source tables and incorporate new parallelism parameters
ClickhouseSource.java Updated the connector interface to support parallel reading with new enumerator and reader
ClickhousePart.java Introduced Comparable interface implementation (stubbed in current diff)
ClickhouseTable.java Added getter for local database name
ClickhouseConnectorErrorCode.java Added new error codes for part retrieval and query issues
ClickhouseSourceOptions.java Defined new options: part_size, partition_list, batch_size, and filter_query
ClickhouseBaseOptions.java Added table option to support table name configuration
docs/en/connector-v2/source/Clickhouse.md Updated documentation with instructions and tips for parallel reading
Comments suppressed due to low confidence (1)

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java:77

  • The compareTo method always returns 0, which effectively treats all instances as equal. Consider implementing a proper comparison (for example, based on the part name) or removing Comparable if natural ordering is not intended.
public int compareTo(ClickhousePart o) { return 0; }

"select name from system.parts where database = '%s' and table = '%s'",
database, table);

if (partitionList != null && !partitionList.isEmpty()) {
Copy link
Preview

Copilot AI Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQL query in getPartList is built by directly concatenating the partition list values. Consider using a parameterized query or properly escaping input values to mitigate the risk of SQL injection.

Copilot uses AI. Check for mistakes.

| username | String | Yes | - | `ClickHouse` user username. |
| password | String | Yes | - | `ClickHouse` user password. |
| database | String | NO | - | The `ClickHouse` database. |
| table | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| table | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` |
| table_path | String | NO | - | The `ClickHouse` table. If it is a distributed table, the cluster is obtained based on the table engine. If it is a local table, build the cluster based on the input `host` |

Same as JDBC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the table_path parameter is used instead, should the database parameter also be removed? Is it uniformly represented by the table_path parameter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@@ -211,6 +213,17 @@ public void testClickHouseWithMultiTableSink(TestContainer container) throws Exc
}
}

@TestTemplate
public void testClickhouseWithParallelismRead(TestContainer testContainer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add test case to verify filter_query and partition_list work properly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will add more test cases.


String sql =
String.format(
"select * from %s.%s where %s limit %d, %d",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this implementation because for a single part, 'limit m,n' can guarantee the order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is designed to read parts in batches to avoid large amounts of data when reading in parallel. Each ClickhousePart object has an offset attribute to record the offset of the current part that has been read, thereby ensuring the order of batch reading.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
What I mean is this, whether to ensure that it won't repeat without sorting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the clickhouse documentation, I found that clickhouse supports a kind of LIMIT... WITH TIES way, this can ensure that the data with the same value in the Order By field will be queried in the same batch. Meanwhile, the Order By field of the table is used to define the sorting key when query part. Can this solution solve the problem?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good.

…er, add sql parallelism read strategy and fix other problem.
@JeremyXin
Copy link
Contributor Author

I have made the following updates:

  1. Add new e2e test cases
  2. Add the table_path parameter and make corresponding configuration modifications (including e2e configuration)
  3. Add sql parallelism read strategy. If sql parameters are specified, the parallel reading is implemented based on the parallelism execution of local table-based queries on each shard of the cluster
  4. Fix the data duplication issue that may be caused by limit
  5. Other newly added code and optimizations

Thanks for helping with the review!

@@ -40,6 +40,13 @@ public class ClickhouseBaseOptions {
.noDefaultValue()
.withDescription("Clickhouse database name");

/** Clickhouse database name */
public static final Option<String> TABLE =
Options.key("table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Options.key("table")
Options.key("table_path")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants