Skip to content

Commit ff7d35e

Browse files
Log data separation (#126)
* use dedicated log s3 * working test * readme * readme
1 parent 4f17827 commit ff7d35e

13 files changed

+121
-97
lines changed

README.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ the ClickHouse S3 function `s3('https://icedb-s3-proxy/**/*.parquet')` or DuckDB
4444
* [Why not Iceberg?](#why-not-iceberg)
4545
* [When not to use IceDB](#when-not-to-use-icedb)
4646
* [Tips before you dive in](#tips-before-you-dive-in)
47-
* [Forcing number types](#forcing-number-types)
47+
* [Forcing property types](#forcing-property-types)
4848
* [Insert in large batches](#insert-in-large-batches)
4949
* [Merge and Tombstone clean often](#merge-and-tombstone-clean-often)
5050
* [Large partitions, sort your data well!](#large-partitions-sort-your-data-well)
5151
* [Schema validation before insert](#schema-validation-before-insert)
5252
* [Tracking the running schema](#tracking-the-running-schema)
53+
* [Separation of log and data](#separation-of-log-and-data)
5354
* [Usage](#usage)
5455
* [Partition function (`part_func`)](#partition-function-part_func)
5556
* [Sorting Order (`sort_order`)](#sorting-order-sort_order)
@@ -87,7 +88,7 @@ from datetime import datetime
8788
from time import time
8889

8990
# create an s3 client to talk to minio
90-
s3c = S3Client(s3prefix="example", s3bucket="testbucket", s3region="us-east-1", s3endpoint="http://localhost:9000",
91+
s3c = S3Client(s3prefix="example", s3bucket="testbucket", s3region="us-east-1", s3endpoint="http://localhost:9000",
9192
s3accesskey="user", s3secretkey="password")
9293

9394
example_events = [
@@ -122,6 +123,7 @@ example_events = [
122123
}
123124
]
124125

126+
125127
def part_func(row: dict) -> str:
126128
"""
127129
Partition by user_id, date
@@ -130,6 +132,7 @@ def part_func(row: dict) -> str:
130132
part = f"u={row['user_id']}/d={row_time.strftime('%Y-%m-%d')}"
131133
return part
132134

135+
133136
# Initialize the client
134137
ice = IceDBv3(
135138
part_func,
@@ -169,7 +172,7 @@ query = ("select user_id, count(*), (properties::JSON)->>'page_name' as page "
169172
"from read_parquet([{}]) "
170173
"group by user_id, page "
171174
"order by count(*) desc").format(
172-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
175+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
173176
)
174177
print(ddb.sql(query))
175178
```
@@ -456,6 +459,12 @@ See a simple [example here](examples/verify-schema.py) on verifying the schema b
456459
IceDB will track the running schema natively. One caveat to this functionality is that if you remove a column as a
457460
part of a partition rewrite and that column never returns, IceDB will not remove that from the schema.
458461

462+
### Separation of log and data
463+
464+
You can use the optional `log_s3_client` to use a different S3 client for log files. All instances of IceDB MUST have the same configuration in this regard.
465+
466+
This is useful for when you may want to have the log in lower-latency time to first byte storage like S3 single zone express, but keep the data in lower cost storage like normal S3.
467+
459468
## Usage
460469

461470
```

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ services:
3434
sleep 1;
3535
/usr/bin/mc alias set myminio http://minio:9000 user password;
3636
/usr/bin/mc mb myminio/testbucket;
37+
/usr/bin/mc mb myminio/testbucket-log;
3738
exit 0;
3839
"
3940
# clickhouse:

examples/api-falcon.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def on_get(self, req, resp):
218218
"from read_parquet([{}]) "
219219
"group by user_id, page "
220220
"order by count(user_id) desc").format(
221-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
221+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
222222
)
223223

224224
# return the result as text

examples/api-flask.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def query_rows():
210210
"from read_parquet([{}]) "
211211
"group by user_id, page "
212212
"order by count(user_id) desc").format(
213-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
213+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
214214
)
215215

216216
# return the result as text

examples/custom-merge-aggregation-with-custom-insert.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def part_func(row: dict) -> str:
107107
"from read_parquet([{}]) "
108108
"group by user_id, event, event "
109109
"order by count(user_id) desc").format(
110-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
110+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
111111
)
112112
print(ddb.sql(query))
113113

@@ -120,7 +120,7 @@ def part_func(row: dict) -> str:
120120
"from read_parquet([{}]) "
121121
"group by user_id, event "
122122
"order by sum(cnt) desc").format(
123-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
123+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
124124
)
125125
print(ddb.sql(query))
126126

@@ -138,7 +138,7 @@ def part_func(row: dict) -> str:
138138
"from read_parquet([{}]) "
139139
"group by user_id, event "
140140
"order by count(user_id) desc").format(
141-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
141+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
142142
)
143143
print(ddb.sql(query))
144144

@@ -151,7 +151,7 @@ def part_func(row: dict) -> str:
151151
"from read_parquet([{}]) "
152152
"group by user_id, event "
153153
"order by sum(cnt) desc").format(
154-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
154+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
155155
)
156156
print(ddb.sql(query))
157157

@@ -170,7 +170,7 @@ def part_func(row: dict) -> str:
170170
"from read_parquet([{}]) "
171171
"group by user_id, event "
172172
"order by count(user_id) desc").format(
173-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
173+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
174174
)
175175
print(ddb.sql(query))
176176

@@ -184,7 +184,7 @@ def part_func(row: dict) -> str:
184184
"from read_parquet([{}]) "
185185
"group by user_id, event "
186186
"order by sum(cnt) desc").format(
187-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
187+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
188188
)
189189
print(ddb.sql(query))
190190

examples/custom-merge-aggregation.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def part_func(row: dict) -> str:
101101
"from read_parquet([{}]) "
102102
"group by user_id, event, event "
103103
"order by count(user_id) desc").format(
104-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
104+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
105105
)
106106
print(ddb.sql(query))
107107

@@ -114,7 +114,7 @@ def part_func(row: dict) -> str:
114114
"from read_parquet([{}]) "
115115
"group by user_id, event "
116116
"order by sum(cnt) desc").format(
117-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
117+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
118118
)
119119
print(ddb.sql(query))
120120

@@ -132,7 +132,7 @@ def part_func(row: dict) -> str:
132132
"from read_parquet([{}]) "
133133
"group by user_id, event "
134134
"order by count(user_id) desc").format(
135-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
135+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
136136
)
137137
print(ddb.sql(query))
138138

@@ -145,7 +145,7 @@ def part_func(row: dict) -> str:
145145
"from read_parquet([{}]) "
146146
"group by user_id, event "
147147
"order by sum(cnt) desc").format(
148-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
148+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
149149
)
150150
print(ddb.sql(query))
151151

@@ -164,7 +164,7 @@ def part_func(row: dict) -> str:
164164
"from read_parquet([{}]) "
165165
"group by user_id, event "
166166
"order by count(user_id) desc").format(
167-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
167+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
168168
)
169169
print(ddb.sql(query))
170170

@@ -178,7 +178,7 @@ def part_func(row: dict) -> str:
178178
"from read_parquet([{}]) "
179179
"group by user_id, event "
180180
"order by sum(cnt) desc").format(
181-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
181+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
182182
)
183183
print(ddb.sql(query))
184184

examples/custom-merge-replacing.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ def part_func(row: dict) -> str:
126126
"from read_parquet([{}]) "
127127
"group by user_id, event "
128128
"order by count(user_id) desc").format(
129-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
129+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
130130
)
131131
print(ddb.sql(query))
132132

@@ -139,7 +139,7 @@ def part_func(row: dict) -> str:
139139
query = ("select user_id, arg_max(event, ts), max(ts)::INT8, arg_max(properties, ts) "
140140
"from read_parquet([{}]) "
141141
"group by user_id ").format(
142-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
142+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
143143
)
144144
print(ddb.sql(query))
145145

@@ -159,7 +159,7 @@ def part_func(row: dict) -> str:
159159
"from read_parquet([{}]) "
160160
"group by user_id, event "
161161
"order by count(user_id) desc").format(
162-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
162+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
163163
)
164164
print(ddb.sql(query))
165165

@@ -172,7 +172,7 @@ def part_func(row: dict) -> str:
172172
query = ("select user_id, arg_max(event, ts), max(ts), arg_max(properties, ts) "
173173
"from read_parquet([{}]) "
174174
"group by user_id ").format(
175-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
175+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
176176
)
177177
print(ddb.sql(query))
178178

@@ -192,7 +192,7 @@ def part_func(row: dict) -> str:
192192
"from read_parquet([{}]) "
193193
"group by user_id, event "
194194
"order by count(user_id) desc").format(
195-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
195+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
196196
)
197197
print(ddb.sql(query))
198198

@@ -207,7 +207,7 @@ def part_func(row: dict) -> str:
207207
query = ("select user_id, arg_max(event, ts), max(ts), arg_max(properties, ts) "
208208
"from read_parquet([{}]) "
209209
"group by user_id ").format(
210-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
210+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
211211
)
212212
print(ddb.sql(query))
213213

examples/materialized-view.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def part_func_mv(row: dict) -> str:
113113
"from read_parquet([{}]) "
114114
"group by user_id, event, event "
115115
"order by count(user_id) desc").format(
116-
', '.join(list(map(lambda x: "'s3://" + ice_raw.s3c.s3bucket + "/" + x.path + "'", alive_files)))
116+
', '.join(list(map(lambda x: "'s3://" + ice_raw.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
117117
)
118118
print(ddb.sql(query))
119119

@@ -126,7 +126,7 @@ def part_func_mv(row: dict) -> str:
126126
"from read_parquet([{}]) "
127127
"group by user_id, event "
128128
"order by sum(cnt) desc").format(
129-
', '.join(list(map(lambda x: "'s3://" + ice_mv.s3c.s3bucket + "/" + x.path + "'", alive_files)))
129+
', '.join(list(map(lambda x: "'s3://" + ice_mv.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
130130
)
131131
print(ddb.sql(query))
132132

@@ -147,7 +147,7 @@ def part_func_mv(row: dict) -> str:
147147
"from read_parquet([{}]) "
148148
"group by user_id, event "
149149
"order by count(user_id) desc").format(
150-
', '.join(list(map(lambda x: "'s3://" + ice_raw.s3c.s3bucket + "/" + x.path + "'", alive_files)))
150+
', '.join(list(map(lambda x: "'s3://" + ice_raw.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
151151
)
152152
print(ddb.sql(query))
153153

@@ -160,7 +160,7 @@ def part_func_mv(row: dict) -> str:
160160
"from read_parquet([{}]) "
161161
"group by user_id, event "
162162
"order by sum(cnt) desc").format(
163-
', '.join(list(map(lambda x: "'s3://" + ice_mv.s3c.s3bucket + "/" + x.path + "'", alive_files)))
163+
', '.join(list(map(lambda x: "'s3://" + ice_mv.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
164164
)
165165
print(ddb.sql(query))
166166

@@ -188,7 +188,7 @@ def part_func_mv(row: dict) -> str:
188188
"from read_parquet([{}]) "
189189
"group by user_id, event "
190190
"order by count(user_id) desc").format(
191-
', '.join(list(map(lambda x: "'s3://" + ice_raw.s3c.s3bucket + "/" + x.path + "'", alive_files)))
191+
', '.join(list(map(lambda x: "'s3://" + ice_raw.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
192192
)
193193
print(ddb.sql(query))
194194

@@ -202,7 +202,7 @@ def part_func_mv(row: dict) -> str:
202202
"from read_parquet([{}]) "
203203
"group by user_id, event "
204204
"order by count(user_id) desc").format(
205-
', '.join(list(map(lambda x: "'s3://" + ice_mv.s3c.s3bucket + "/" + x.path + "'", alive_files)))
205+
', '.join(list(map(lambda x: "'s3://" + ice_mv.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
206206
)
207207
print(ddb.sql(query))
208208

@@ -216,7 +216,7 @@ def part_func_mv(row: dict) -> str:
216216
"from read_parquet([{}]) "
217217
"group by user_id, event "
218218
"order by sum(cnt) desc").format(
219-
', '.join(list(map(lambda x: "'s3://" + ice_mv.s3c.s3bucket + "/" + x.path + "'", alive_files)))
219+
', '.join(list(map(lambda x: "'s3://" + ice_mv.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
220220
)
221221
print(ddb.sql(query))
222222

examples/simple-full.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def part_func(row: dict) -> str:
8282
# Run the query
8383
query = ("select * "
8484
"from read_parquet([{}]) ").format(
85-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
85+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
8686
)
8787
print(ddb.sql(query))
8888

@@ -96,7 +96,7 @@ def part_func(row: dict) -> str:
9696
"from read_parquet([{}]) "
9797
"group by user_id, page "
9898
"order by count(*) desc").format(
99-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
99+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
100100
)
101101
print(ddb.sql(query))
102102

@@ -125,7 +125,7 @@ def part_func(row: dict) -> str:
125125
"from read_parquet([{}]) "
126126
"group by user_id, page "
127127
"order by count(*) desc").format(
128-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
128+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
129129
)
130130
print(ddb.sql(query))
131131

@@ -148,7 +148,7 @@ def part_func(row: dict) -> str:
148148
"from read_parquet([{}]) "
149149
"group by user_id, page "
150150
"order by count(*) desc").format(
151-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
151+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
152152
)
153153
print(ddb.sql(query))
154154

examples/verify-schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def check_schema_conflicts(old: Schema, new: Schema):
122122
"from read_parquet([{}]) "
123123
"group by user_id, page "
124124
"order by count(user_id) desc").format(
125-
', '.join(list(map(lambda x: "'s3://" + ice.s3c.s3bucket + "/" + x.path + "'", alive_files)))
125+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
126126
)
127127
print(ddb.sql(query))
128128

0 commit comments

Comments
 (0)