Skip to content

Commit 03b9c9e

Browse files
log merging breaks in sample after few retries #136 (#137)
* Fix logs/data getting out of sync during cleanup * Update icedb/icedb.py kept because we must preserve tombstones for `min_age` Co-authored-by: Dan Goodman <[email protected]> * Update icedb/icedb.py the latest time at which a tombstone is allowed to be deleted Co-authored-by: Dan Goodman <[email protected]> * refactor: Clarify comment describing tombstone expiration time calculation * now change * review feedback --------- Co-authored-by: Dan Goodman <[email protected]>
1 parent 3cbb48f commit 03b9c9e

File tree

4 files changed

+185
-13
lines changed

4 files changed

+185
-13
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
.parquet
33
__pycache__
44
.idea
5+
.aider*
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from time import time, sleep
2+
from helpers import delete_all_s3
3+
import duckdb
4+
from icedb.log import S3Client, IceLogIO, FileMarker
5+
from icedb import IceDBv3, CompressionCodec
6+
from datetime import datetime
7+
8+
# S3 configuration dictionary
9+
S3_CONFIG = {
10+
"s3_region": "us-east-1",
11+
"s3_endpoint": "http://localhost:9000",
12+
"s3_access_key_id": "user",
13+
"s3_secret_access_key": "password",
14+
"s3_use_ssl": False,
15+
"s3_url_style": "path" # can be 'path' or 'vhost'
16+
}
17+
18+
# Bucket-specific S3 config not used by DuckDB
19+
S3_BUCKET_CONFIG = {
20+
"bucket": "testbucket",
21+
"prefix": "example",
22+
}
23+
24+
# create an s3 client to talk to minio
25+
s3c = S3Client(
26+
s3prefix=S3_BUCKET_CONFIG["prefix"],
27+
s3bucket=S3_BUCKET_CONFIG["bucket"],
28+
s3region=S3_CONFIG["s3_region"],
29+
s3endpoint=S3_CONFIG["s3_endpoint"],
30+
s3accesskey=S3_CONFIG["s3_access_key_id"],
31+
s3secretkey=S3_CONFIG["s3_secret_access_key"]
32+
)
33+
34+
example_events = [
35+
{
36+
"ts": 1686176939445,
37+
"event": "page_load",
38+
"user_id": "user_a",
39+
"properties": {
40+
"page_name": "Home"
41+
}
42+
}, {
43+
"ts": 1676126229999,
44+
"event": "page_load",
45+
"user_id": "user_b",
46+
"properties": {
47+
"page_name": "Home"
48+
}
49+
}, {
50+
"ts": 1686176939666,
51+
"event": "page_load",
52+
"user_id": "user_a",
53+
"properties": {
54+
"page_name": "Settings"
55+
}
56+
}, {
57+
"ts": 1686176941445,
58+
"event": "page_load",
59+
"user_id": "user_a",
60+
"properties": {
61+
"page_name": "Home"
62+
}
63+
}
64+
]
65+
66+
67+
def part_func(row: dict) -> str:
68+
"""
69+
Partition by user_id, date
70+
"""
71+
row_time = datetime.utcfromtimestamp(row['ts'] / 1000)
72+
part = f"u={row['user_id']}/d={row_time.strftime('%Y-%m-%d')}"
73+
return part
74+
75+
76+
# Initialize the client
77+
ice = IceDBv3(
78+
partition_function=part_func, # Partitions by user_id and date
79+
sort_order=['event', 'ts'], # Sort by event, then timestamp of the event within the data part
80+
# S3 settings from config
81+
s3_region=S3_CONFIG["s3_region"],
82+
s3_access_key=S3_CONFIG["s3_access_key_id"],
83+
s3_secret_key=S3_CONFIG["s3_secret_access_key"],
84+
s3_endpoint=S3_CONFIG["s3_endpoint"],
85+
s3_use_path=S3_CONFIG["s3_url_style"] == "path",
86+
# S3 client instance
87+
s3_client=s3c,
88+
# Other settings
89+
path_safe_hostname="dan-mbp",
90+
compression_codec=CompressionCodec.ZSTD, # Use ZSTD for higher compression ratio compared to default SNAPPY
91+
)
92+
93+
def once():
94+
95+
96+
97+
# Insert records
98+
inserted = ice.insert(example_events)
99+
print(f"{len(inserted)} created files (ice.insert): {', '.join(x.path for x in inserted)}")
100+
101+
# Read the log state
102+
log = IceLogIO("demo-host")
103+
_, file_markers, log_tombstones, log_files = log.read_at_max_time(s3c, round(time() * 1000))
104+
print(f"{len(log_files)} log files: {', '.join(log_files)}")
105+
print(f"{len(log_tombstones)} log tombstones: {', '.join(x.path for x in log_tombstones)}")
106+
alive_files = list(filter(lambda x: x.tombstone is None, file_markers))
107+
tombstoned_files = list(filter(lambda x: x.tombstone is not None, file_markers))
108+
print(f"{len(alive_files)} alive files: {', '.join(x.path for x in alive_files)}")
109+
print(f"{len(tombstoned_files)} tombstoned files: {', '.join(x.path for x in tombstoned_files)}")
110+
print(f"file_markers: {file_markers}")
111+
# Setup duckdb for querying local minio
112+
ddb = duckdb.connect(":memory:")
113+
ddb.execute("install httpfs")
114+
ddb.execute("load httpfs")
115+
116+
# Set DuckDB S3 configuration from the config dictionary
117+
for key, value in S3_CONFIG.items():
118+
if key == "s3_endpoint":
119+
# Strip protocol prefix by splitting on :// once
120+
value = value.split("://", 1)[1]
121+
ddb.execute(f"SET {key}='{value}'")
122+
123+
# Query alive files
124+
query = ("select user_id, count(*), (properties::JSON)->>'page_name' as page "
125+
"from read_parquet([{}]) "
126+
"group by user_id, page "
127+
"order by count(*) desc").format(
128+
', '.join(list(map(lambda x: "'s3://" + ice.data_s3c.s3bucket + "/" + x.path + "'", alive_files)))
129+
)
130+
print(ddb.sql(query))
131+
132+
new_log, new_file_marker, partition, merged_file_markers, meta = ice.merge()
133+
if partition is not None: # if any merge happened
134+
print(f"Merged partition: {partition}")
135+
if merged_file_markers:
136+
print(f"- {len(merged_file_markers)} source files merged: {', '.join(x.path for x in merged_file_markers)}")
137+
print(f"- merged_file_markers {merged_file_markers}")
138+
print(f"- into: {new_file_marker.path}")
139+
print(f"- new log: {new_log}")
140+
141+
cleaned_logs, deleted_logs, deleted_data = ice.tombstone_cleanup(1_000)
142+
print(f"{len(cleaned_logs)} cleaned log files: {', '.join(cleaned_logs)}")
143+
print(f"{len(deleted_logs)} deleted log files: {', '.join(deleted_logs)}")
144+
print(f"{len(deleted_data)} deleted data files: {', '.join(deleted_data)}")
145+
146+
147+
# wipe everything at the start
148+
delete_all_s3(s3c)
149+
150+
for i in range(30):
151+
try:
152+
once()
153+
except Exception as e:
154+
print(f"Failed after {i} runs")
155+
raise e
156+
sleep(1)
157+
158+
# wipe everything at the end if successful
159+
delete_all_s3(s3c)

icedb/icedb.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -340,34 +340,46 @@ def tombstone_cleanup(self, min_age_ms: int) -> tuple[list[str], list[str], list
340340
now = round(time() * 1000)
341341

342342
log_files_to_delete: dict[str, bool] = {}
343+
log_files_to_keep: dict[str, LogTombstone] = {}
343344
data_files_to_delete: dict[str, bool] = {}
344345
data_files_to_keep: dict[str, FileMarker] = {}
345346
schema = Schema()
346347

347-
current_log_files = logio.get_current_log_files(self.log_s3c)
348+
cur_schema, cur_files, cur_tombstones, all_log_files = logio.read_at_max_time(self.log_s3c, now)
349+
348350
# We only need to get merge files
349-
merge_log_files = list(filter(lambda x: get_log_file_info(x['Key'])[1], current_log_files))
350-
for file in merge_log_files:
351+
merge_log_files = list(filter(lambda x: get_log_file_info(x)[1], all_log_files))
352+
353+
for log_file in merge_log_files:
351354
obj = self.log_s3c.s3.get_object(
352355
Bucket=self.log_s3c.s3bucket,
353-
Key=file['Key']
356+
Key=log_file
354357
)
355358
jsonl = str(obj['Body'].read(), encoding="utf-8").split("\n")
356359
meta_json = json.loads(jsonl[0])
357360
meta = LogMetadataFromJSON(meta_json)
358-
361+
expired = now - min_age_ms # time at which a tombstone is allowed to be deleted
359362
# Log tombstones
360363
if meta.tombstoneLineIndex is not None:
361364
for i in range(meta.tombstoneLineIndex, meta.fileLineIndex):
362365
tmb = LogTombstoneFromJSON(dict(json.loads(jsonl[i])))
363-
if tmb.createdMS <= now - min_age_ms:
366+
if tmb.createdMS <= expired:
364367
log_files_to_delete[tmb.path] = True
365-
368+
else:
369+
log_files_to_keep[tmb.path] = tmb
366370
# File markers
367371
for i in range(meta.fileLineIndex, len(jsonl)):
368372
fm_json = dict(json.loads(jsonl[i]))
369373
fm = FileMarkerFromJSON(fm_json)
370-
if fm.createdMS <= now - min_age_ms and fm.tombstone is not None:
374+
375+
tombstone = fm.tombstone
376+
if not tombstone:
377+
# find fm.path in cur_files
378+
for cf in cur_files:
379+
if cf.path == fm.path:
380+
tombstone = cf.tombstone
381+
break
382+
if tombstone is not None and tombstone <= expired:
371383
data_files_to_delete[fm.path] = True
372384
if fm.path in data_files_to_keep:
373385
del data_files_to_keep[fm.path]
@@ -378,7 +390,7 @@ def tombstone_cleanup(self, min_age_ms: int) -> tuple[list[str], list[str], list
378390
schema_json = dict(json.loads(jsonl[meta.schemaLineIndex]))
379391
schema.accumulate(list(schema_json.keys()), list(schema_json.values()))
380392

381-
cleaned_log_files.append(file['Key'])
393+
cleaned_log_files.append(log_file)
382394

383395
# Delete log tombstones
384396
for log_path in log_files_to_delete.keys():
@@ -394,7 +406,7 @@ def tombstone_cleanup(self, min_age_ms: int) -> tuple[list[str], list[str], list
394406
Bucket=self.data_s3c.s3bucket,
395407
Key=data_path
396408
)
397-
deleted_log_files.append(data_path)
409+
deleted_data_files.append(data_path)
398410

399411

400412

@@ -404,7 +416,7 @@ def tombstone_cleanup(self, min_age_ms: int) -> tuple[list[str], list[str], list
404416
1,
405417
schema,
406418
list(data_files_to_keep.values()),
407-
None,
419+
list(log_files_to_keep.values()), # kept because we must preserve tombstones for `min_age`
408420
merged=True,
409421
timestamp=round(time()*1000)
410422
)
@@ -574,4 +586,4 @@ def rewrite_partition(self, target_partition: str, filter_query: str) -> tuple[s
574586
merged=True
575587
)
576588

577-
return new_log, meta, list(map(lambda x: x.path, rewrite_targets))
589+
return new_log, meta, list(map(lambda x: x.path, rewrite_targets))

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from setuptools import setup, find_packages
22

3-
VERSION = '0.9.1'
3+
VERSION = '0.9.2'
44
DESCRIPTION = 'IceDB'
55
LONG_DESCRIPTION = 'Parquet merge engine'
66

0 commit comments

Comments
 (0)