-
Notifications
You must be signed in to change notification settings - Fork 253
feat: add clickhouse data sink #4850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR adds ClickHouse data sink functionality to Daft, enabling users to write DataFrames directly to ClickHouse tables. The implementation follows Daft's established DataSink pattern used by other database connectors like Lance and Turbopuffer.
The changes include:
- New dependency management: Added
clickhouse-connect >= 0.8.18
as an optional dependency inpyproject.toml
and pinned version inrequirements-dev.txt
- ClickHouse data sink implementation: Created
ClickHouseDataSink
class indaft/io/clickhouse/clickhouse_data_sink.py
that handles connection management, micropartition processing, and result aggregation - DataFrame integration: Added
write_clickhouse()
method to the DataFrame class with comprehensive parameter support for host, port, credentials, database, table, and custom client/write options - Module structure: Created the
daft/io/clickhouse/
package directory with__init__.py
The implementation integrates with Daft's existing data sink architecture, using the three-phase pattern (write, finalize) for distributed execution. Users can now write DataFrames to ClickHouse using df.write_clickhouse(host="localhost", port=8123, table="my_table")
and receive aggregated statistics about the write operation.
Confidence score: 2/5
- This PR has several critical issues that need to be addressed before it can be safely merged
- The main concerns are incomplete module setup, naming inconsistencies, and potential configuration security issues
- Files that need more attention:
daft/io/clickhouse/__init__.py
,daft/io/clickhouse/clickhouse_data_sink.py
, andpyproject.toml
Critical Issues:
- Empty module interface:
daft/io/clickhouse/__init__.py
is empty, making the ClickHouse functionality inaccessible to users who try to import it - Naming inconsistencies: The data sink implementation contains multiple references to "ByteHouse" instead of "ClickHouse" in comments and variable names, suggesting incomplete adaptation from another codebase
- Configuration security flaw: The client kwargs merging logic allows user-provided parameters to potentially override explicit connection parameters
- Incomplete dependency setup: The 'clickhouse' extra is not included in the 'all' extras list in
pyproject.toml
- Unused code: The
ClickHouseConfig
class is defined but never used
5 files reviewed, 6 comments
pyproject.toml
Outdated
@@ -25,6 +25,7 @@ requires-python = ">=3.9" | |||
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity]"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The 'all' extra should include 'clickhouse' to ensure users who install all optional dependencies get ClickHouse support
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity]"] | |
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity, clickhouse]"] |
class ClickHouseConfig: | ||
def __init__(self, host: str, port: int, user: str, password: str, database: str, table: str) -> None: | ||
self.host = host | ||
self.port = port | ||
self.user = user | ||
self.password = password | ||
self.database = database | ||
self.table = table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This ClickHouseConfig
class is defined but never used in the implementation. Consider removing it or integrating it into the main class design.
client_kwargs = client_kwargs or {} | ||
self._client_kwargs = {**client_kwargs, **self._client_kwargs} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The merging order allows client_kwargs
to override explicit parameters. This could cause unexpected behavior if users pass conflicting values.
client_kwargs = client_kwargs or {} | |
self._client_kwargs = {**client_kwargs, **self._client_kwargs} | |
self._client_kwargs = {**self._client_kwargs, **client_kwargs} |
return self._result_schema | ||
|
||
def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[QuerySummary]]: | ||
"""Writes to Bytehouse from the given micropartitions.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
syntax: Comment incorrectly references "Bytehouse" instead of "ClickHouse".
"""Writes to Bytehouse from the given micropartitions.""" | |
"""Writes to ClickHouse from the given micropartitions.""" |
def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[QuerySummary]]: | ||
"""Writes to Bytehouse from the given micropartitions.""" | ||
# socket cannot be serialized, so we need to create a new client in write | ||
bh_client = get_client(**self._client_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Variable name bh_client
suggests ByteHouse instead of ClickHouse. Consider renaming for consistency.
bh_client = get_client(**self._client_kwargs) | |
ch_client = get_client(**self._client_kwargs) |
bh_client.close() | ||
|
||
def finalize(self, write_results: list[WriteResult[QuerySummary]]) -> MicroPartition: | ||
"""Finish write to ByteHouse dataset. Returns a DataFrame with the stats of the dataset.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
syntax: Comment incorrectly references "ByteHouse" instead of "ClickHouse".
"""Finish write to ByteHouse dataset. Returns a DataFrame with the stats of the dataset.""" | |
"""Finish write to ClickHouse dataset. Returns a DataFrame with the stats of the dataset.""" |
|
||
|
||
class ClickHouseDataSink(DataSink[QuerySummary]): | ||
def __init__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ClickHouseConfig has been defined. Why are the parameters still separated here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClickHouseConfig is no need
requirements-dev.txt
Outdated
|
||
|
||
#clickhouse | ||
clickhouse-connect==0.8.18 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this version have to be fixed? Why is it different from pyproject.toml?
tbl = MicroPartition.from_pydict( | ||
{ | ||
"total_written_rows": pa.array([total_written_rows], pa.int64()), | ||
"total_written_bytes": pa.array([total_written_bytes], pa.int64()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does total_written_bytes
only exist in the datasink of ck and not in other datasinks? Can it be abstracted into the metrics of sink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think total_written_bytes field is for ck. Different sinks correspond to different indicator information, right
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4850 +/- ##
==========================================
+ Coverage 79.27% 79.28% +0.01%
==========================================
Files 908 910 +2
Lines 125867 125920 +53
==========================================
+ Hits 99777 99833 +56
+ Misses 26090 26087 -3
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR adds ClickHouse data sink functionality to Daft, enabling users to write DataFrames directly to ClickHouse tables. The implementation follows Daft's established DataSink pattern used by other database connectors like Lance and Turbopuffer.
The recent changes address several critical issues from the previous review:
- Fixed module interface: The
daft/io/clickhouse/__init__.py
file now properly imports and exposes theClickHouseDataSink
class through__all__
- Corrected dependency configuration: The 'clickhouse' extra is now properly included in the 'all' extras list in
pyproject.toml
- Improved parameter handling: The ClickHouse data sink implementation properly merges connection parameters with user-provided client kwargs
- Proper development setup: Added
clickhouse-connect>=0.8.18
torequirements-dev.txt
for development environment consistency
The implementation integrates seamlessly with Daft's existing data sink architecture, using the three-phase pattern (write, finalize) for distributed execution. Users can now write DataFrames to ClickHouse using df.write_clickhouse(host="localhost", port=8123, table="my_table")
and receive aggregated statistics about the write operation. The data sink handles Arrow table conversion, manages ClickHouse client connections appropriately for distributed environments, and provides proper resource cleanup.
Confidence score: 4/5
- This PR is now much safer to merge after addressing the critical issues from the previous review
- The implementation follows established patterns and includes proper error handling and resource management
- Files still needing attention:
daft/io/clickhouse/clickhouse_data_sink.py
for potential parameter override security considerations
5 files reviewed, 2 comments
daft/dataframe/dataframe.py
Outdated
user: Optional[str] = None, | ||
password: Optional[str] = None, | ||
database: Optional[str] = None, | ||
table: Optional[str] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The table
parameter should be required since the ClickHouseDataSink constructor raises ValueError if table is None/empty
requirements-dev.txt
Outdated
#clickhouse | ||
clickhouse-connect>=0.8.18 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider adding a comment explaining why version >=0.8.18 is required, similar to other pinned dependencies in this file.
password: str | None = None, | ||
database: str | None = None, | ||
table: str | None = None, | ||
client_kwargs: dict[str, Any] | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the host/port in client_kwargs? What is usually filled in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other params, eg: timeout、access_token
@Jay-ju Can you help me review code. Thanks. |
Changes Made
Add the clickhouse data sink, and use it to writes the DataFrame to a ClickHouse table
Related Issues
Checklist
docs/mkdocs.yml
navigation