Skip to content

Commit e2c70ef

Browse files
v3 (#73)
* move folder to v2 and v3 tests for checking proxy behavior * arch work, copy icedb file and remove postgres * tombstone cleanup * inactive file cleanups with tombstones * conflicts * tombstone cleanup * typo fix * remove old paragraph * updates and fixes * reading the log files * clarify timestamp usage * typo * update copy * update * update * log and package structure * file append work * write log file, start reading * time tracking, testing , log logic * comments * remove comment * use ms time and host in file name, remove ksuid * time travel * cleanup insert operation untested * cleanup, fixes, and working icedb insert test * rename signature * rename signature * add custom merge query * improved testing with assert * start merging return log files from read make files printable * fix tombstones add tombstone validation to icedb test * rename * styling, make static * basic test for conflicting types * start read backward support pagination in listing from s3 merge and append return file meta * merge timestamps in log file name read backward * arch docs typo remove merge timestamp from meta as it's in name already * reverse read with timestamp merge use reverse read * remove timestamp for reverse read * remove timestamp for read * helper function for reading list of relevant log files forward * typos, clean up reading the log forward helper add virtual field for log marker to track the source log file when read * fix virtual source log file * rename signature * comments * merge only rewrites logs of log files invovled in merge * remove reverse read from test and mark reverse read deprecated * remove merged marker in log file not as not needed * fix merge tombstone cleanup * remove todos * scale and pagination test * scale and pagination test fix merge and tombstone clean bugs fix merge bug duplicating contents of merged files (ignored tombstones and copied them in merge) readme notes on performance * readme * exports * remove v2 exports * export format * fix import * remove merge timestamp * add schema introspection * test s3 proxy
1 parent 9cc3f45 commit e2c70ef

15 files changed

+1469
-782
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
.env
22
.parquet
33
__pycache__
4+
.idea

ARCHITECTURE.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# IceDB v3 Architecture
2+
3+
## Log file(s)
4+
5+
IceDB keeps track of the active files and schema in a log, much like other database systems. This log is stored in S3, and is append-only. This log can also be truncated via a tombstone cleanup process described below.
6+
7+
Both the schema and active files are tracked within the same log file, and in each log file.
8+
9+
### Log file structure
10+
11+
The log file is new-line delimited JSON, with the line being special. The first line is in the schema (typescript format):
12+
13+
```ts
14+
interface {
15+
v: string // the version number
16+
t: number // unix ms timestamp of file creation. For merges, this is the timestamp after listing ends and merging logic begins, for append operations, it's the moment metadata is created, tombstone cleanup leaves the current value when replacing a file
17+
sch: number // line number that the accumulated schema begins at
18+
f: number // line number that the list of file markers begins at
19+
tmb?: number // line number that the list of log file tombstones start at
20+
}
21+
```
22+
23+
#### Schema (sch)
24+
25+
There is only one schema line per file, taking the form:
26+
27+
```ts
28+
interface {
29+
[column: string]: string // example: "user_id": "VARCHAR"
30+
}
31+
```
32+
33+
Columns are never removed from the schema, and they always consist of the union of log file schemas.
34+
35+
If data type conflicts are found (e.g. log file A has a column as a VARCHAR, but log file B has a column as a BIGINT), then the merge fails and error logs are thrown. This can be mitigated by having the ingestion instances read the schema periodically and caching in memory (only need to read log files up through the last schema line, then can abort request). One could also choose to use a transactionally-secure schema catalog to protect this, have data sources declare their schema ahead of time, and more to validate the schema. Ultimately it is not up to IceDB to verify the schema during inserts.
36+
37+
#### Log file tombstones (tmb)
38+
39+
These are the logs files that were merged into a new file. If log files A and B were merged into C, not all data part files listed in A and B were necessarily merged into new data parts marked in C. Because of this, files that existed in A and B that were not part of the merge are copied to log file C in the alive status. Any files that were merged are marked as not alive by having a tombstone reference.
40+
41+
Because log files A and B were merged into C, we created "tombstones" for log files A and B. Tombstones are kept track of so that some background cleaning process can remove the merged log files after some grace period (for example files older than the max query timeout * 2). This is why it's important to insert infrequently and in large batches.
42+
43+
They take the format:
44+
45+
```ts
46+
interface {
47+
"p": string // the file path, e.g. /some/prefixed/_log/ksuid.jsonl
48+
"t": number // the timestamp when the tombstone was created (when this log file was first part of a merge)
49+
}
50+
```
51+
52+
#### File marker (f)
53+
54+
There are at least one file markers per log file, taking the form:
55+
56+
```ts
57+
interface {
58+
"p": string // the file path, e.g. /some/prefixed/file.parquet
59+
"b": number // the size in bytes
60+
"t": number // created timestamp in milliseconds
61+
"tmb"?: int // exists if the file is not alive, the unix ms the file was tombstoned
62+
}
63+
```
64+
65+
### Reading the log files
66+
67+
To get a snapshot-consistent view of the database, a reader must perform the following actions:
68+
69+
1. List all files in the `_log` prefix for the table
70+
2. Read each found log file sequentially (they are sorted by time), removing known data parts as file markers are found with tombstone references, and accumulating the current schema (handling schema conflicts if found)
71+
3. Return the final list of active files and accumulated schema
72+
73+
A stable timestamp can be optionally used to "time travel" *(this should not be older than the `tmb_grace_sec` to
74+
prevent missing data)*. This can be used for repeatable reads of the same view of the data.
75+
76+
## Merging
77+
78+
Merging requires coordination with an exclusive lock on a table.
79+
80+
When a merge occurs, both data parts and log files are merged. A newly created log file is the combination of:
81+
82+
1. New data parts created in the merge (should be 1) (`f`)
83+
2. Files that were part of the merge, marked with tombstone references (`f`)
84+
3. Files that were not part of the merge, marked alive (`f`)
85+
4. Tombstones of the logs files involved in the merge (`tmb`)
86+
87+
The reason for copying the state of untouched files is that the new log file represents a new view of modified data.
88+
If log files A and B were merged into C, then A and B represent a stale version of the data and only exist to prevent breaking existing list query operations from not being able to find their files.
89+
90+
Merged log files are not immediately deleted to prevent issues with current operations, and are marked as merged in the new log file so they are able to be cleaned up. You must to ensure that files are only cleaned long after they could be in use (say multiple times the max e2e query timeout, including list operation times). This is why it's important to insert infrequently and in large batches, to prevent too many files from building up before they can be deleted.
91+
92+
Data part files that were part of the data merge are marked with tombstones so in the event that a list operation sees files A, B, and C, it knows that the old files were merged and should be removed from the resulting list of active files. If it only ends up seeing A and B, then it just gets a stale view of the data. This is why it's important to ensure that a single query gets a constant-time view of the database, so nested queries do not cause an inconsistent view of the data.
93+
94+
Tombstones include the timestamp when they were first merged for the tombstone cleanup worker. When files merge, the must always carry forward any found tombstones. Tombstone cleanup is idempotent so in the event that a merge occurs concurrently with tombstone cleanup there is no risk of data loss or duplication. Merging must also always carry forward file markers that have tombstones, as these are also removed by the tomestone cleanup process.
95+
96+
## Tombstone cleanup
97+
98+
The second level of coordination with a second exclusive lock that is needed is tombstone cleaning. There is a separate `tmb_grace_sec` parameter that controls how long tombstone files are kept for.
99+
100+
When tombstone cleanup occurs, the entire state of the log is read. Any tombstones that are found older than the `tmb_grace_sec` are deleted from S3.
101+
102+
When the cleaning process finds a log file with tombstones, it first deletes those files from S3. If that is successful (not found errors being idempotent passes), then the log file is replaced with the same contents, minus the tombstones and any file markers that had those tombstone references.

README.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,103 @@ _Massive WIP_
88

99
See https://blog.danthegoodman.com/icedb-v2
1010

11+
<!-- TOC -->
12+
* [IceDB](#icedb)
13+
* [Performance test](#performance-test)
14+
* [KNOWN GOTCHAS](#known-gotchas)
15+
* [Examples](#examples)
16+
* [Usage](#usage)
17+
* [`partitionStrategy`](#partitionstrategy)
18+
* [`sortOrder`](#sortorder)
19+
* [`formatRow`](#formatrow)
20+
* [`unique_row_key`](#uniquerowkey)
21+
* [Pre-installing extensions](#pre-installing-extensions)
22+
* [Merging](#merging)
23+
* [Concurrent merges](#concurrent-merges)
24+
* [Cleaning Merged Files](#cleaning-merged-files)
25+
* [Custom Merge Query (ADVANCED USAGE)](#custom-merge-query-advanced-usage)
26+
* [Handling `_row_id`](#handling-rowid)
27+
* [Deduplicating Data on Merge](#deduplicating-data-on-merge)
28+
* [Replacing Data on Merge](#replacing-data-on-merge)
29+
* [Aggregating Data on Merge](#aggregating-data-on-merge)
30+
* [Multiple Tables](#multiple-tables)
31+
* [Meta Store Schema](#meta-store-schema)
32+
<!-- TOC -->
33+
34+
## Performance test
35+
36+
From the test, inserting 2000 times with 2 parts, shows performance against S3 and reading in the state and schema
37+
38+
```
39+
============== insert hundreds ==============
40+
this will take a while...
41+
inserted 200
42+
inserted hundreds in 11.283345699310303
43+
reading in the state
44+
read hundreds in 0.6294591426849365
45+
files 405 logs 202
46+
verify expected results
47+
got 405 alive files
48+
[(406, 'a'), (203, 'b')] in 0.638556957244873
49+
merging it
50+
merged partition cust=test/d=2023-02-11 with 203 files in 1.7919442653656006
51+
read post merge state in 0.5759727954864502
52+
files 406 logs 203
53+
verify expected results
54+
got 203 alive files
55+
[(406, 'a'), (203, 'b')] in 0.5450308322906494
56+
merging many more times to verify
57+
merged partition cust=test/d=2023-06-07 with 200 files in 2.138633966445923
58+
merged partition cust=test/d=2023-06-07 with 3 files in 0.638775110244751
59+
merged partition None with 0 files in 0.5988118648529053
60+
merged partition None with 0 files in 0.6049611568450928
61+
read post merge state in 0.6064021587371826
62+
files 408 logs 205
63+
verify expected results
64+
got 2 alive files
65+
[(406, 'a'), (203, 'b')] in 0.0173952579498291
66+
tombstone clean it
67+
tombstone cleaned 4 cleaned log files, 811 deleted log files, 1012 data files in 4.3332929611206055
68+
read post tombstone clean state in 0.0069119930267333984
69+
verify expected results
70+
got 2 alive files
71+
[(406, 'a'), (203, 'b')] in 0.015745878219604492
72+
73+
============== insert thousands ==============
74+
this will take a while...
75+
inserted 2000
76+
inserted thousands in 107.14211988449097
77+
reading in the state
78+
read thousands in 7.370793104171753
79+
files 4005 logs 2002
80+
verify expected results
81+
[(4006, 'a'), (2003, 'b')] in 6.49034309387207
82+
merging it
83+
breaking on marker count
84+
merged 2000 in 16.016802072525024
85+
read post merge state in 6.011193037033081
86+
files 4006 logs 2003
87+
verify expected results
88+
[(4006, 'a'), (2003, 'b')] in 6.683710098266602
89+
# laptop became unstable around here
90+
```
91+
92+
Some notes:
93+
94+
1. Very impressive state read performance with so many files (remember it has to open each one and accumulate the
95+
state!)
96+
2. Merging happens very quick
97+
3. Tombstone cleaning happens super quick as well
98+
4. DuckDB performs surprisingly well with so many files (albeit they are one or two rows each)
99+
5. At hundreds of log files and partitions (where most tables should live at), performance was exceptional
100+
6. Going from hundreds to thousands, performance is nearly perfectly linear, sometimes even super linear (merges)!
101+
102+
Having such a large log files (merged but not tombstone cleaned) is very unrealistic. Chances are worst case you
103+
have <100 log files and hundreds or low thousands of data files. Otherwise you are either not merging/cleaning
104+
enough, or your partition scheme is far too granular.
105+
106+
The stability of my laptop struggled when doing the thousands test, so I only showed where I could consistently get to.
107+
11108
## KNOWN GOTCHAS
12109

13110
There is a bug in duckdb right now where `read_parquet` will fire a table macro twice, and show the file twice when listing them, but this doesn't affect actual query results: https://github.com/duckdb/duckdb/issues/7897

docker-compose.yml

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ volumes:
44
minio_storage: null
55
crdb_storage: null
66
services:
7-
crdb:
8-
container_name: crdb
9-
image: cockroachdb/cockroach
10-
ports:
11-
- "26257:26257"
12-
- "8080:8080"
13-
command: start-single-node --insecure
14-
volumes:
15-
- crdb_storage:/cockroach/cockroach-data
7+
# crdb:
8+
# container_name: crdb
9+
# image: cockroachdb/cockroach
10+
# ports:
11+
# - "26257:26257"
12+
# - "8080:8080"
13+
# command: start-single-node --insecure
14+
# volumes:
15+
# - crdb_storage:/cockroach/cockroach-data
1616
minio:
1717
image: minio/minio
1818
ports:
@@ -35,12 +35,12 @@ services:
3535
/usr/bin/mc mb myminio/testbucket;
3636
exit 0;
3737
"
38-
clickhouse:
39-
image: clickhouse/clickhouse-server
40-
depends_on:
41-
- minio
42-
- crdb
43-
container_name: ch
44-
volumes:
45-
- ./ch/user_scripts:/var/lib/clickhouse/user_scripts:0777
46-
- /workspaces/icedb/ch/functions/get_files_function.xml:/etc/clickhouse-server/get_files_function.xml
38+
# clickhouse:
39+
# image: clickhouse/clickhouse-server:latest
40+
# depends_on:
41+
# - minio
42+
# - crdb
43+
# container_name: ch
44+
# volumes:
45+
# - ./ch/user_scripts:/var/lib/clickhouse/user_scripts:0777
46+
# - /workspaces/icedb/ch/functions/get_files_function.xml:/etc/clickhouse-server/get_files_function.xml

examples/clickhouse.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,10 @@ docker exec ch clickhouse-client -q "SELECT sum(JSONExtractInt(properties, 'numt
2626
```
2727

2828
This will show the same results as found in the final query of `examples/simple.py`
29+
30+
You can create a parameterized view for a nicer query experience like:
31+
```
32+
docker exec ch clickhouse-client -q "create view icedb as select * from s3(get_files(toYear({start_date:Date}), toMonth({start_date:Date}), toDate({start_date:Date}), toYear({end_date:Date}), toMonth({end_date:Date}), toDate({end_date:Date})), 'user', 'password', 'Parquet')"
33+
34+
docker exec ch clickhouse-client -q "SELECT sum(JSONExtractInt(properties, 'numtime')), user_id from icedb where start_date = '2023-02-01' and end_date = '2023-08-01 and event = 'page_load' group by user_id FORMAT Pretty;"
35+
```

icedb/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
1-
from .icedb import IceDB, PartitionFunctionType
1+
from .log import (
2+
IceLogIO, Schema, LogMetadata, LogTombstone, NoLogFilesException, FileMarker, S3Client,
3+
LogMetadataFromJSON, FileMarkerFromJSON, LogTombstoneFromJSON, SchemaConflictException, get_log_file_info
4+
)
5+
from .icedb import IceDBv3, PartitionFunctionType, FormatRowType

icedb/ch-test.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
select * FROM s3('https://webhook.site/1d7527f0-be57-4e48-aea1-f988b6ff62f5/ookla-open-data/parquet/performance/type=*/year=*/quarter=*/*.parquet', 'Parquet', 'quadkey Nullable(String), tile Nullable(String), avg_d_kbps Nullable(Int64), avg_u_kbps Nullable(Int64), avg_lat_ms Nullable(Int64), tests Nullable(Int64), devices Nullable(Int64)')
2+
3+
4+
select * FROM s3('https://webhook.site/1d7527f0-be57-4e48-aea1-f988b6ff62f5/ookla-open-data/parquet/performance/a.parquet', 'Parquet', 'quadkey Nullable(String), tile Nullable(String), avg_d_kbps Nullable(Int64), avg_u_kbps Nullable(Int64), avg_lat_ms Nullable(Int64), tests Nullable(Int64), devices Nullable(Int64)')

icedb/ddb_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import duckdb
2+
import pandas as pd
3+
4+
ddb = duckdb.connect()
5+
6+
# print(ddb.sql('''
7+
# load httpfs
8+
# '''))
9+
# print(ddb.sql('''
10+
# load parquet
11+
# '''))
12+
# print(ddb.sql('''
13+
# SET s3_region='us-east-1'
14+
# '''))
15+
# print(ddb.sql('''
16+
# SET s3_endpoint='webhook.site'
17+
# '''))
18+
# print(ddb.sql('''
19+
# SET s3_url_style='path'
20+
# '''))
21+
# try:
22+
# print(ddb.sql('''
23+
# select * from read_parquet('s3://1d7527f0-be57-4e48-aea1-f988b6ff62f5/ookla-open-data/parquet/performance/type=*/year=*/quarter=*/*.parquet')
24+
# '''))
25+
# except:
26+
# pass
27+
# try:
28+
# print(ddb.sql('''
29+
# select * from read_parquet('s3://1d7527f0-be57-4e48-aea1-f988b6ff62f5/blah.parquet')
30+
# '''))
31+
# except:
32+
# pass
33+
34+
df = pd.DataFrame([{'a': 123, 'b': 1.2, 'c': 'hey', 'd': ['hey']}])
35+
ddb.execute("describe select * from df")
36+
res = ddb.df()
37+
print(res['column_name'].tolist(), res['column_type'].tolist())
38+
print('/'.join([None, 'hey', 'ho']))

0 commit comments

Comments
 (0)