Skip to content

feat: add clickhouse data sink #4850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

Conversation

huleilei
Copy link
Contributor

Changes Made

Add the clickhouse data sink, and use it to writes the DataFrame to a ClickHouse table

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Jul 25, 2025
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR adds ClickHouse data sink functionality to Daft, enabling users to write DataFrames directly to ClickHouse tables. The implementation follows Daft's established DataSink pattern used by other database connectors like Lance and Turbopuffer.

The changes include:

  • New dependency management: Added clickhouse-connect >= 0.8.18 as an optional dependency in pyproject.toml and pinned version in requirements-dev.txt
  • ClickHouse data sink implementation: Created ClickHouseDataSink class in daft/io/clickhouse/clickhouse_data_sink.py that handles connection management, micropartition processing, and result aggregation
  • DataFrame integration: Added write_clickhouse() method to the DataFrame class with comprehensive parameter support for host, port, credentials, database, table, and custom client/write options
  • Module structure: Created the daft/io/clickhouse/ package directory with __init__.py

The implementation integrates with Daft's existing data sink architecture, using the three-phase pattern (write, finalize) for distributed execution. Users can now write DataFrames to ClickHouse using df.write_clickhouse(host="localhost", port=8123, table="my_table") and receive aggregated statistics about the write operation.

Confidence score: 2/5

  • This PR has several critical issues that need to be addressed before it can be safely merged
  • The main concerns are incomplete module setup, naming inconsistencies, and potential configuration security issues
  • Files that need more attention: daft/io/clickhouse/__init__.py, daft/io/clickhouse/clickhouse_data_sink.py, and pyproject.toml

Critical Issues:

  1. Empty module interface: daft/io/clickhouse/__init__.py is empty, making the ClickHouse functionality inaccessible to users who try to import it
  2. Naming inconsistencies: The data sink implementation contains multiple references to "ByteHouse" instead of "ClickHouse" in comments and variable names, suggesting incomplete adaptation from another codebase
  3. Configuration security flaw: The client kwargs merging logic allows user-provided parameters to potentially override explicit connection parameters
  4. Incomplete dependency setup: The 'clickhouse' extra is not included in the 'all' extras list in pyproject.toml
  5. Unused code: The ClickHouseConfig class is defined but never used

5 files reviewed, 6 comments

Edit Code Review Bot Settings | Greptile

pyproject.toml Outdated
@@ -25,6 +25,7 @@ requires-python = ">=3.9"
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity]"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The 'all' extra should include 'clickhouse' to ensure users who install all optional dependencies get ClickHouse support

Suggested change
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity]"]
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity, clickhouse]"]

Comment on lines 18 to 25
class ClickHouseConfig:
def __init__(self, host: str, port: int, user: str, password: str, database: str, table: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.table = table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This ClickHouseConfig class is defined but never used in the implementation. Consider removing it or integrating it into the main class design.

Comment on lines 49 to 50
client_kwargs = client_kwargs or {}
self._client_kwargs = {**client_kwargs, **self._client_kwargs}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The merging order allows client_kwargs to override explicit parameters. This could cause unexpected behavior if users pass conflicting values.

Suggested change
client_kwargs = client_kwargs or {}
self._client_kwargs = {**client_kwargs, **self._client_kwargs}
self._client_kwargs = {**self._client_kwargs, **client_kwargs}

return self._result_schema

def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[QuerySummary]]:
"""Writes to Bytehouse from the given micropartitions."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Comment incorrectly references "Bytehouse" instead of "ClickHouse".

Suggested change
"""Writes to Bytehouse from the given micropartitions."""
"""Writes to ClickHouse from the given micropartitions."""

def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[QuerySummary]]:
"""Writes to Bytehouse from the given micropartitions."""
# socket cannot be serialized, so we need to create a new client in write
bh_client = get_client(**self._client_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Variable name bh_client suggests ByteHouse instead of ClickHouse. Consider renaming for consistency.

Suggested change
bh_client = get_client(**self._client_kwargs)
ch_client = get_client(**self._client_kwargs)

bh_client.close()

def finalize(self, write_results: list[WriteResult[QuerySummary]]) -> MicroPartition:
"""Finish write to ByteHouse dataset. Returns a DataFrame with the stats of the dataset."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Comment incorrectly references "ByteHouse" instead of "ClickHouse".

Suggested change
"""Finish write to ByteHouse dataset. Returns a DataFrame with the stats of the dataset."""
"""Finish write to ClickHouse dataset. Returns a DataFrame with the stats of the dataset."""



class ClickHouseDataSink(DataSink[QuerySummary]):
def __init__(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClickHouseConfig has been defined. Why are the parameters still separated here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClickHouseConfig is no need



#clickhouse
clickhouse-connect==0.8.18
Copy link
Contributor

@Jay-ju Jay-ju Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this version have to be fixed? Why is it different from pyproject.toml?

tbl = MicroPartition.from_pydict(
{
"total_written_rows": pa.array([total_written_rows], pa.int64()),
"total_written_bytes": pa.array([total_written_bytes], pa.int64()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does total_written_bytes only exist in the datasink of ck and not in other datasinks? Can it be abstracted into the metrics of sink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think total_written_bytes field is for ck. Different sinks correspond to different indicator information, right

Copy link

codecov bot commented Jul 25, 2025

Codecov Report

❌ Patch coverage is 98.11321% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 79.28%. Comparing base (bda006a) to head (9d50769).

Files with missing lines Patch % Lines
daft/io/clickhouse/clickhouse_data_sink.py 97.77% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4850      +/-   ##
==========================================
+ Coverage   79.27%   79.28%   +0.01%     
==========================================
  Files         908      910       +2     
  Lines      125867   125920      +53     
==========================================
+ Hits        99777    99833      +56     
+ Misses      26090    26087       -3     
Files with missing lines Coverage Δ
daft/dataframe/dataframe.py 86.89% <100.00%> (+0.05%) ⬆️
daft/io/clickhouse/__init__.py 100.00% <100.00%> (ø)
daft/io/clickhouse/clickhouse_data_sink.py 97.77% <97.77%> (ø)

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR adds ClickHouse data sink functionality to Daft, enabling users to write DataFrames directly to ClickHouse tables. The implementation follows Daft's established DataSink pattern used by other database connectors like Lance and Turbopuffer.

The recent changes address several critical issues from the previous review:

  • Fixed module interface: The daft/io/clickhouse/__init__.py file now properly imports and exposes the ClickHouseDataSink class through __all__
  • Corrected dependency configuration: The 'clickhouse' extra is now properly included in the 'all' extras list in pyproject.toml
  • Improved parameter handling: The ClickHouse data sink implementation properly merges connection parameters with user-provided client kwargs
  • Proper development setup: Added clickhouse-connect>=0.8.18 to requirements-dev.txt for development environment consistency

The implementation integrates seamlessly with Daft's existing data sink architecture, using the three-phase pattern (write, finalize) for distributed execution. Users can now write DataFrames to ClickHouse using df.write_clickhouse(host="localhost", port=8123, table="my_table") and receive aggregated statistics about the write operation. The data sink handles Arrow table conversion, manages ClickHouse client connections appropriately for distributed environments, and provides proper resource cleanup.

Confidence score: 4/5

  • This PR is now much safer to merge after addressing the critical issues from the previous review
  • The implementation follows established patterns and includes proper error handling and resource management
  • Files still needing attention: daft/io/clickhouse/clickhouse_data_sink.py for potential parameter override security considerations

5 files reviewed, 2 comments

Edit Code Review Bot Settings | Greptile

user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
table: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The table parameter should be required since the ClickHouseDataSink constructor raises ValueError if table is None/empty

Comment on lines 121 to 122
#clickhouse
clickhouse-connect>=0.8.18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding a comment explaining why version >=0.8.18 is required, similar to other pinned dependencies in this file.

password: str | None = None,
database: str | None = None,
table: str | None = None,
client_kwargs: dict[str, Any] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the host/port in client_kwargs? What is usually filled in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other params, eg: timeout、access_token

@huleilei
Copy link
Contributor Author

@Jay-ju Can you help me review code. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants