Skip to content

[Feature][Connector-V2] ClickHouse source support parallelism #9421

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from

Conversation

mrtisttt
Copy link
Contributor

@mrtisttt mrtisttt commented Jun 10, 2025

Purpose of this pull request

closed #9338

Does this PR introduce any user-facing change?

Yes, tt provides a new feature that enables parallel reading support for ClickHouse.

How was this patch tested?

Add connector unit tests and e2e tests.

Check list

Sorry, something went wrong.

mrtisttt added 3 commits June 11, 2025 03:23
@hailin0 hailin0 requested a review from Copilot June 11, 2025 01:44
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for parallel reading to the ClickHouse V2 connector, enabling users to shard source queries and improve read throughput.

  • Introduces split enumeration and reader logic to dispatch multiple query “splits” to parallel subtasks
  • Adds unit tests (ClickhouseChunkSplitterTest) and E2E scenarios (ClickhouseIT) covering numeric, date/time, and string partitioning
  • Updates connector code (ClickhouseSourceConfig, factory, reader, enumerator) and configuration docs to expose partition_column, partition_num, and optional bounds

Reviewed Changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
docs/zh/connector-v2/source/Clickhouse.md, docs/en/connector-v2/source/Clickhouse.md Documented new partition_column, partition_num, partition_lower_bound, partition_upper_bound options
seatunnel-e2e/.../connector-clickhouse-e2e/.../parallel_read/*.conf Added example configs for string, numeric, date, datetime, and single-shard batch tests
seatunnel-e2e/.../connector-clickhouse-e2e/.../ClickhouseIT.java Extended integration tests to validate parallel reads
seatunnel-connectors-v2/.../ClickhouseChunkSplitterTest.java New unit tests for split generation logic
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceConfig.java Added builder-backed config object for parallel options
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceOptions.java Defined new connector options interface including partitioning keys
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceFactory.java Hooked up new ClickhouseSourceConfig in factory
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSource.java Implemented SeaTunnelSource and SupportParallelism interfaces
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceSplitEnumerator.java Added logic to generate and assign splits to readers
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceReader.java Replaced single-split reader with split-aware SourceReader implementation
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickHouseSourceSplit.java Introduced SourceSplit data holder
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseNumericBetweenParametersProvider.java Helper for numeric/date range parameter generation
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseChunkSplitter.java Core split-generation logic handling numeric, date/time, and string columns
seatunnel-connectors-v2/connector-clickhouse/src/main/java/.../ClickhouseSourceState.java Added serialVersionUID for state snapshot
Comments suppressed due to low confidence (2)

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java:238

  • The test labeled testClickHouseWithParallelReadDateTimeCol is executing the date-based config instead of the datetime config. Update the path to clickhouse_to_clickhouse_with_parallel_read_datetime.conf.
container.executeJob("/parallel_read/clickhouse_to_clickhouse_with_parallel_read_date.conf");

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java:46

  • The Context type is not imported or fully qualified. It should be SourceReader.Context or add import org.apache.seatunnel.api.source.SourceReader.Context; to resolve compilation errors.
private final Context context;

}

private static int getSplitOwner(String splitId, int numReaders) {
return splitId.hashCode() % numReaders;
Copy link
Preview

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using hashCode() % numReaders can yield negative indices when hashCode() is negative. Consider using Math.floorMod(splitId.hashCode(), numReaders) to ensure non-negative reader assignment.

Suggested change
return splitId.hashCode() % numReaders;
return Math.floorMod(splitId.hashCode(), numReaders);

Copilot uses AI. Check for mistakes.

public void close() throws IOException {}

@Override
public void addSplitsBack(List<ClickHouseSourceSplit> splits, int subtaskId) {}
Copy link
Preview

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addSplitsBack method is left empty, so splits returned on failure won’t be reassigned. Implement logic to re-add these splits to pendingSplits to support recovery.

Suggested change
public void addSplitsBack(List<ClickHouseSourceSplit> splits, int subtaskId) {}
public void addSplitsBack(List<ClickHouseSourceSplit> splits, int subtaskId) {
synchronized (stateLock) {
log.info("Adding splits back for subtask {}: {}", subtaskId, splits);
pendingSplits.computeIfAbsent(subtaskId, k -> new ArrayList<>()).addAll(splits);
}
}

Copilot uses AI. Check for mistakes.


@Override
public int currentUnassignedSplitSize() {
return 0;
Copy link
Preview

Copilot AI Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentUnassignedSplitSize always returns 0; consider returning the total count of unassigned splits (e.g., sum of sizes in pendingSplits) to accurately report backlog.

Suggested change
return 0;
synchronized (stateLock) {
return pendingSplits.values().stream()
.mapToInt(List::size)
.sum();
}

Copilot uses AI. Check for mistakes.

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice you use chunk splitter of jdbc. It's a not bad choice. But maybe use

is a better way. Then connector can split partition by itself without user config.

@mrtisttt
Copy link
Contributor Author

mrtisttt commented Jun 11, 2025

I notice you use chunk splitter of jdbc. It's a not bad choice. But maybe use

is a better way. Then connector can split partition by itself without user config.

Hi @Hisoka-X Thank you sincerely for your review.

Certainly, the splitter in ClickHouse refers to the implementation of JDBC's FixedChunkSplitter and has undergone specific adaptations, optimizations, and fixes tailored for ClickHouse, making its functionality more stable and reliable.

I have indeed noticed the DynamicChunkSplitter and reserved extensible approaches when implementing the FixedChunkSplitter. However, considering that ClickHouse did not support basic parallel reading previously, and the implementation of both FixedChunkSplitter and DynamicChunkSplitter would involve significant code development and testing efforts, they are not implemented at the same time.

The FixedChunkSplitter has stable functionality and allows users a certain degree of flexibility. On the basis of retaining the FixedChunkSplitter, I will submit a PR in the future to support the DynamicChunkSplitter. How about this?

@mrtisttt mrtisttt requested a review from Hisoka-X June 11, 2025 08:51
@mrtisttt
Copy link
Contributor Author

@Hisoka-X Hi, Would it be appropriate for me to submit a PR to support the DynamicChunkSplitter in the upcoming period?

@Hisoka-X
Copy link
Member

Yes, please use DynamicChunkSplitter to reduce unnecessary configuration. In fact, the FixedChunkSplitter is a legacy in JDBC and is not recommended.

@mrtisttt
Copy link
Contributor Author

@Hisoka-X Since clickhouse source connector still use sql query to read source tables and there are still many imperfections, in the subsequent implementation, I will also implement:

  • query_table(just like jdbc or doris, use table_path option or similar)
  • where_condition(just like jdbc or doris, use filter.query option or similar)
  • read_filed(just like doris, use read_filed or similary)

to better support DynamicChunkSpiltter. This is also preparing for the subsequent implementation of multi-table reading. At the same time, perhaps in the future, we can remove the option to use sql query to read source data tables.

How about this idea?

@Hisoka-X
Copy link
Member

query_table(just like jdbc or doris, use table_path option or similar)

+1 for this, this way can better identify the metadata of the table, thereby improving read performance.

where_condition(just like jdbc or doris, use filter.query option or similar)
read_filed(just like doris, use read_filed or similary)

This way is not necessary at present, and the filter conditions and field selection can actually be reflected in the query.

Moreover, query can preprocess data, which cannot be done by other ways, such as join.

So let support query with DynamicChunkSpiltter first, then support query_table .

@mrtisttt
Copy link
Contributor Author

@Hisoka-X I have considered this model, but there is a problem: does the approach of query_table + query mean that the table name written in the user's query must be exactly the table in query_table? In this case, performing validation would be very difficult.

@JeremyXin
Copy link
Contributor

@Hisoka-X Hi, I have thought of a reading method. For the case of query_table, all the parts are obtained through the system table (system.parts), and then the part data is directly read from table, similar to how doris reads the tablet. For filtering situations, filtering can be carried out through the partition list. This approach is similar to the implementation of doris and does not require writing sql to achieve concurrent reading of data tables.

Is this a feasible plan? At present, I have implemented the above-mentioned functions and conducted tests in practical applications.

@Carl-Zhou-CN
Copy link
Member

@Hisoka-X Hi, I have thought of a reading method. For the case of query_table, all the parts are obtained through the system table (system.parts), and then the part data is directly read from table, similar to how doris reads the tablet. For filtering situations, filtering can be carried out through the partition list. This approach is similar to the implementation of doris and does not require writing sql to achieve concurrent reading of data tables.

Is this a feasible plan? At present, I have implemented the above-mentioned functions and conducted tests in practical applications.

Good idea. Does it support distributed tables?

@Hisoka-X
Copy link
Member

For the case of query_table, all the parts are obtained through the system table (system.parts), and then the part data is directly read from table, similar to how doris reads the tablet. For filtering situations, filtering can be carried out through the partition list.

@JeremyXin Yes, it's a good way for query_table.

At present, I have implemented the above-mentioned functions and conducted tests in practical applications.

Welcome to contribute!

I have considered this model, but there is a problem: does the approach of query_table + query mean that the table name written in the user's query must be exactly the table in query_table?

@mrtisttt We have a processing priority. If query and table_path are configured at the same time, we get data through query and get meta information through table_path. Please refer

if (StringUtils.isNotEmpty(tableConfig.getTablePath())
&& StringUtils.isNotEmpty(tableConfig.getQuery())) {
TablePath tablePath = jdbcDialect.parse(tableConfig.getTablePath());
CatalogTable tableOfPath = null;
try {
tableOfPath = jdbcCatalog.getTable(tablePath);
} catch (Exception e) {
// ignore
log.debug("User-defined table path: {}", tablePath);
}
CatalogTable tableOfQuery = jdbcCatalog.getTable(tableConfig.getQuery());
if (tableOfPath == null) {
String catalogName =
tableOfQuery.getTableId() == null
? DEFAULT_CATALOG_NAME
: tableOfQuery.getTableId().getCatalogName();
TableIdentifier tableIdentifier =
TableIdentifier.of(
catalogName,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
return CatalogTable.of(tableIdentifier, tableOfQuery);
}
return mergeCatalogTable(tableOfPath, tableOfQuery);

@JeremyXin
Copy link
Contributor

@Hisoka-X Hi, I have thought of a reading method. For the case of query_table, all the parts are obtained through the system table (system.parts), and then the part data is directly read from table, similar to how doris reads the tablet. For filtering situations, filtering can be carried out through the partition list. This approach is similar to the implementation of doris and does not require writing sql to achieve concurrent reading of data tables.
Is this a feasible plan? At present, I have implemented the above-mentioned functions and conducted tests in practical applications.

Good idea. Does it support distributed tables?

Yes,for distributed tables, reading is actually still performed on the local table. The part under each shard will be obtained and read concurrently.

@mrtisttt
Copy link
Contributor Author

@Hisoka-X Hi, I have thought of a reading method. For the case of query_table, all the parts are obtained through the system table (system.parts), and then the part data is directly read from table, similar to how doris reads the tablet. For filtering situations, filtering can be carried out through the partition list. This approach is similar to the implementation of doris and does not require writing sql to achieve concurrent reading of data tables.
Is this a feasible plan? At present, I have implemented the above-mentioned functions and conducted tests in practical applications.

Good idea. Does it support distributed tables?

Yes, I've also thought about this issue. In ClickHouse, distributed tables pose a significant challenge, which is exactly why I mentioned the strong dependency on query_table.

@mrtisttt
Copy link
Contributor Author

@Hisoka-X @Carl-Zhou-CN @JeremyXin So, how about we first implement the query_table approach? Or should we first implement the query-based DynamicChunkSplitter following the ClickHouse source connector's pattern?

@Hisoka-X
Copy link
Member

@Hisoka-X @Carl-Zhou-CN @JeremyXin So, how about we first implement the query_table approach? Or should we first implement the query-based DynamicChunkSplitter following the ClickHouse source connector's pattern?

+1. Overall, table_path (let's align its name with the other connectors) is more important.

@JeremyXin
Copy link
Contributor

@Hisoka-X @mrtisttt I will submit the preliminary code as soon as possible to see if it is a suitable reading scheme. @mrtisttt
Or maybe you can continue to develop code based on DynamicChunkSplitter? If there are any questions, welcome to communicate together.

@Carl-Zhou-CN
Copy link
Member

@Hisoka-X @mrtisttt I will submit the preliminary code as soon as possible to see if it is a suitable reading scheme. @mrtisttt Or maybe you can continue to develop code based on DynamicChunkSplitter? If there are any questions, welcome to communicate together.

Looking forward to your submission. I think the two are not in conflict and users can have more choices

@mrtisttt
Copy link
Contributor Author

@Hisoka-X @mrtisttt I will submit the preliminary code as soon as possible to see if it is a suitable reading scheme. @mrtisttt Or maybe you can continue to develop code based on DynamicChunkSplitter? If there are any questions, welcome to communicate together.

Looking forward to your submission. I think the two are not in conflict and users can have more choices

Okay, I understand. Then I'll submit the implementation of DynamicChunkSpillter first. This will indeed provide users with an additional option.

@Hisoka-X
Copy link
Member

@Hisoka-X In this case, do we still need to implement the query-based DynamicChunkSpillter?

Yes please, query can cover many more scenarios than table_path. We can also use the ability of table_path to read metadata to optimize the query reading sharding, which is also done in JDBC.

@Carl-Zhou-CN
Copy link
Member

@Hisoka-X In this case, do we still need to implement the query-based DynamicChunkSpillter?

Yes please, query can cover many more scenarios than table_path. We can also use the ability of table_path to read metadata to optimize the query reading sharding, which is also done in JDBC.

+1: In some scenarios, SQL can only be completed in distributed tables and cannot be done locally

@mrtisttt
Copy link
Contributor Author

@Hisoka-X In this case, do we still need to implement the query-based DynamicChunkSpillter?

Yes please, query can cover many more scenarios than table_path. We can also use the ability of table_path to read metadata to optimize the query reading sharding, which is also done in JDBC.

+1: In some scenarios, SQL can only be completed in distributed tables and cannot be done locally

OK, that makes the thinking very clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Connector-V2] ClickHouse source support parallelism
4 participants