-
Notifications
You must be signed in to change notification settings - Fork 2k
[Feature][Connector-V2] ClickHouse source support parallelism #9421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for parallel reading to the ClickHouse V2 connector, enabling users to shard source queries and improve read throughput.
- Introduces split enumeration and reader logic to dispatch multiple query “splits” to parallel subtasks
- Adds unit tests (
ClickhouseChunkSplitterTest
) and E2E scenarios (ClickhouseIT
) covering numeric, date/time, and string partitioning - Updates connector code (
ClickhouseSourceConfig
, factory, reader, enumerator) and configuration docs to exposepartition_column
,partition_num
, and optional bounds
Reviewed Changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
docs/zh/connector-v2/source/Clickhouse.md, docs/en/connector-v2/source/Clickhouse.md | Documented new partition_column , partition_num , partition_lower_bound , partition_upper_bound options |
seatunnel-e2e/.../connector-clickhouse-e2e/.../parallel_read/*.conf | Added example configs for string, numeric, date, datetime, and single-shard batch tests |
seatunnel-e2e/.../connector-clickhouse-e2e/.../ClickhouseIT.java | Extended integration tests to validate parallel reads |
seatunnel-connectors-v2/.../ClickhouseChunkSplitterTest.java | New unit tests for split generation logic |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceConfig.java | Added builder-backed config object for parallel options |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceOptions.java | Defined new connector options interface including partitioning keys |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceFactory.java | Hooked up new ClickhouseSourceConfig in factory |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSource.java | Implemented SeaTunnelSource and SupportParallelism interfaces |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceSplitEnumerator.java | Added logic to generate and assign splits to readers |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceReader.java | Replaced single-split reader with split-aware SourceReader implementation |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickHouseSourceSplit.java | Introduced SourceSplit data holder |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseNumericBetweenParametersProvider.java | Helper for numeric/date range parameter generation |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseChunkSplitter.java | Core split-generation logic handling numeric, date/time, and string columns |
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceState.java | Added serialVersionUID for state snapshot |
Comments suppressed due to low confidence (2)
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java:238
- The test labeled
testClickHouseWithParallelReadDateTimeCol
is executing the date-based config instead of the datetime config. Update the path toclickhouse_to_clickhouse_with_parallel_read_datetime.conf
.
container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf");
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java:46
- The
Context
type is not imported or fully qualified. It should beSourceReader.Context
or addimport org.apache.seatunnel.api.source.SourceReader.Context;
to resolve compilation errors.
private final Context context;
} | ||
|
||
private static int getSplitOwner(String splitId, int numReaders) { | ||
return splitId.hashCode() % numReaders; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using hashCode() % numReaders
can yield negative indices when hashCode()
is negative. Consider using Math.floorMod(splitId.hashCode(), numReaders)
to ensure non-negative reader assignment.
return splitId.hashCode() % numReaders; | |
return Math.floorMod(splitId.hashCode(), numReaders); |
Copilot uses AI. Check for mistakes.
public void close() throws IOException {} | ||
|
||
@Override | ||
public void addSplitsBack(List<ClickHouseSourceSplit> splits, int subtaskId) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The addSplitsBack
method is left empty, so splits returned on failure won’t be reassigned. Implement logic to re-add these splits to pendingSplits
to support recovery.
public void addSplitsBack(List<ClickHouseSourceSplit> splits, int subtaskId) {} | |
public void addSplitsBack(List<ClickHouseSourceSplit> splits, int subtaskId) { | |
synchronized (stateLock) { | |
log.info("Adding splits back for subtask {}: {}", subtaskId, splits); | |
pendingSplits.computeIfAbsent(subtaskId, k -> new ArrayList<>()).addAll(splits); | |
} | |
} |
Copilot uses AI. Check for mistakes.
|
||
@Override | ||
public int currentUnassignedSplitSize() { | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentUnassignedSplitSize
always returns 0; consider returning the total count of unassigned splits (e.g., sum of sizes in pendingSplits
) to accurately report backlog.
return 0; | |
synchronized (stateLock) { | |
return pendingSplits.values().stream() | |
.mapToInt(List::size) | |
.sum(); | |
} |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice you use chunk splitter of jdbc. It's a not bad choice. But maybe use
Line 58 in f4178c7
public class DynamicChunkSplitter extends ChunkSplitter { |
Hi @Hisoka-X Thank you sincerely for your review. Certainly, the splitter in ClickHouse refers to the implementation of JDBC's I have indeed noticed the The |
@Hisoka-X Hi, Would it be appropriate for me to submit a PR to support the DynamicChunkSplitter in the upcoming period? |
Yes, please use DynamicChunkSplitter to reduce unnecessary configuration. In fact, the FixedChunkSplitter is a legacy in JDBC and is not recommended. |
@Hisoka-X Since clickhouse source connector still use sql query to read source tables and there are still many imperfections, in the subsequent implementation, I will also implement:
to better support DynamicChunkSpiltter. This is also preparing for the subsequent implementation of multi-table reading. At the same time, perhaps in the future, we can remove the option to use sql query to read source data tables. How about this idea? |
+1 for this, this way can better identify the metadata of the table, thereby improving read performance.
This way is not necessary at present, and the filter conditions and field selection can actually be reflected in the query. Moreover, query can preprocess data, which cannot be done by other ways, such as join. So let support query with |
@Hisoka-X I have considered this model, but there is a problem: does the approach of |
@Hisoka-X Hi, I have thought of a reading method. For the case of query_table, all the parts are obtained through the system table (system.parts), and then the part data is directly read from table, similar to how doris reads the tablet. For filtering situations, filtering can be carried out through the partition list. This approach is similar to the implementation of doris and does not require writing sql to achieve concurrent reading of data tables. Is this a feasible plan? At present, I have implemented the above-mentioned functions and conducted tests in practical applications. |
Good idea. Does it support distributed tables? |
@JeremyXin Yes, it's a good way for query_table.
Welcome to contribute!
@mrtisttt We have a processing priority. If query and table_path are configured at the same time, we get data through query and get meta information through table_path. Please refer Lines 175 to 199 in 8a9e8ad
|
Yes,for distributed tables, reading is actually still performed on the local table. The part under each shard will be obtained and read concurrently. |
Yes, I've also thought about this issue. In ClickHouse, distributed tables pose a significant challenge, which is exactly why I mentioned the strong dependency on |
@Hisoka-X @Carl-Zhou-CN @JeremyXin So, how about we first implement the query_table approach? Or should we first implement the query-based DynamicChunkSplitter following the ClickHouse source connector's pattern? |
+1. Overall, |
Looking forward to your submission. I think the two are not in conflict and users can have more choices |
Okay, I understand. Then I'll submit the implementation of DynamicChunkSpillter first. This will indeed provide users with an additional option. |
Yes please, query can cover many more scenarios than table_path. We can also use the ability of table_path to read metadata to optimize the query reading sharding, which is also done in JDBC. |
+1: In some scenarios, SQL can only be completed in distributed tables and cannot be done locally |
OK, that makes the thinking very clear. |
Purpose of this pull request
closed #9338
Does this PR introduce any user-facing change?
Yes, tt provides a new feature that enables parallel reading support for ClickHouse.
How was this patch tested?
Add connector unit tests and e2e tests.
Check list
New License Guide