Skip to content

Commit 24ff9bf

Browse files
remove format_row func from examples
1 parent 80e3f59 commit 24ff9bf

7 files changed

+60
-108
lines changed

examples/custom-merge-aggregation-with-custom-insert.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,7 @@ def part_func(row: dict) -> str:
3636
return part
3737

3838

39-
def format_row(row: dict) -> dict:
40-
"""
41-
Considering this would be a materialized view on raw incoming data,
42-
we prepare it differently than `simple-full.py`
43-
"""
44-
del row['properties'] # drop properties because we don't need it for this table
45-
return row
46-
47-
48-
ice = get_ice(s3c, part_func, format_row)
39+
ice = get_ice(s3c, part_func)
4940
ice.custom_merge_query = """
5041
select sum(cnt)::INT8 as cnt, max(ts) as ts, user_id, event
5142
from source_files
@@ -64,30 +55,34 @@ def format_row(row: dict) -> dict:
6455
"ts": 1686176939445,
6556
"event": "page_load",
6657
"user_id": "user_a",
67-
"properties": {
58+
"cnt": 1, # seed the incoming columns with the count to sum
59+
"properties": json.dumps({
6860
"page_name": "Home"
69-
},
61+
}),
7062
}, {
7163
"ts": 1676126229999,
7264
"event": "page_load",
7365
"user_id": "user_b",
74-
"properties": {
66+
"cnt": 1, # seed the incoming columns with the count to sum
67+
"properties": json.dumps({
7568
"page_name": "Home"
76-
},
69+
}),
7770
}, {
7871
"ts": 1686176939666,
7972
"event": "page_load",
8073
"user_id": "user_a",
81-
"properties": {
74+
"cnt": 1, # seed the incoming columns with the count to sum
75+
"properties": json.dumps({
8276
"page_name": "Settings"
83-
},
77+
}),
8478
}, {
8579
"ts": 1686176941445,
8680
"event": "page_load",
8781
"user_id": "user_a",
88-
"properties": {
82+
"cnt": 1, # seed the incoming columns with the count to sum
83+
"properties": json.dumps({
8984
"page_name": "Home"
90-
},
85+
}),
9186
}
9287
]
9388

examples/custom-merge-aggregation.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,7 @@ def part_func(row: dict) -> str:
3636
return part
3737

3838

39-
def format_row(row: dict) -> dict:
40-
"""
41-
Considering this would be a materialized view on raw incoming data,
42-
we prepare it differently than `simple-full.py`
43-
"""
44-
del row['properties'] # drop properties because we don't need it for this table
45-
row['cnt'] = 1 # seed the incoming columns with the count to sum
46-
return row
47-
48-
49-
ice = get_ice(s3c, part_func, format_row)
39+
ice = get_ice(s3c, part_func)
5040
ice.custom_merge_query = """
5141
select sum(cnt)::INT8 as cnt, max(ts) as ts, user_id, event
5242
from source_files
@@ -58,31 +48,35 @@ def format_row(row: dict) -> dict:
5848
{
5949
"ts": 1686176939445,
6050
"event": "page_load",
51+
"cnt": 1, # seed the incoming columns with the count to sum
6152
"user_id": "user_a",
62-
"properties": {
53+
"properties": json.dumps({
6354
"page_name": "Home"
64-
},
55+
}),
6556
}, {
6657
"ts": 1676126229999,
6758
"event": "page_load",
59+
"cnt": 1, # seed the incoming columns with the count to sum
6860
"user_id": "user_b",
69-
"properties": {
61+
"properties": json.dumps({
7062
"page_name": "Home"
71-
},
63+
}),
7264
}, {
7365
"ts": 1686176939666,
7466
"event": "page_load",
67+
"cnt": 1, # seed the incoming columns with the count to sum
7568
"user_id": "user_a",
76-
"properties": {
69+
"properties": json.dumps({
7770
"page_name": "Settings"
78-
},
71+
}),
7972
}, {
8073
"ts": 1686176941445,
8174
"event": "page_load",
75+
"cnt": 1, # seed the incoming columns with the count to sum
8276
"user_id": "user_a",
83-
"properties": {
77+
"properties": json.dumps({
8478
"page_name": "Home"
85-
},
79+
}),
8680
}
8781
]
8882

examples/custom-merge-replacing.py

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,8 @@ def part_func(row: dict) -> str:
3232
return part
3333

3434

35-
def format_row(row: dict) -> dict:
36-
"""
37-
No special preparation required in this example, other than converting the JSON to a string
38-
"""
39-
row['properties'] = json.dumps(row['properties']) # convert nested dict to json string
40-
return row
41-
4235

43-
ice = get_ice(s3c, part_func, format_row)
36+
ice = get_ice(s3c, part_func)
4437
ice.custom_merge_query = """
4538
select user_id, arg_max(event, ts) as event, max(ts) as ts, arg_max(properties, ts) as properties
4639
from source_files
@@ -53,30 +46,30 @@ def format_row(row: dict) -> dict:
5346
"ts": 1686176939445,
5447
"event": "page_load",
5548
"user_id": "user_a",
56-
"properties": {
49+
"properties": json.dumps({
5750
"page_name": "Home"
58-
},
51+
}),
5952
}, {
6053
"ts": 1676126229999,
6154
"event": "page_load",
6255
"user_id": "user_b",
63-
"properties": {
56+
"properties": json.dumps({
6457
"page_name": "Home"
65-
},
58+
}),
6659
}, {
6760
"ts": 1686176939666,
6861
"event": "page_load",
6962
"user_id": "user_a",
70-
"properties": {
63+
"properties": json.dumps({
7164
"page_name": "Settings"
72-
},
65+
}),
7366
}, {
7467
"ts": 1686176941445,
7568
"event": "page_load",
7669
"user_id": "user_a",
77-
"properties": {
70+
"properties": json.dumps({
7871
"page_name": "Home"
79-
},
72+
}),
8073
}
8174
]
8275

@@ -85,30 +78,30 @@ def format_row(row: dict) -> dict:
8578
"ts": 1686176939446,
8679
"event": "page_load",
8780
"user_id": "user_a",
88-
"properties": {
81+
"properties": json.dumps({
8982
"page_name": "Home"
90-
},
83+
}),
9184
}, {
9285
"ts": 1676126230000,
9386
"event": "page_load",
9487
"user_id": "user_b",
95-
"properties": {
88+
"properties": json.dumps({
9689
"page_name": "Home"
97-
},
90+
}),
9891
}, {
9992
"ts": 1686176939667,
10093
"event": "page_load",
10194
"user_id": "user_a",
102-
"properties": {
95+
"properties": json.dumps({
10396
"page_name": "Settings"
104-
},
97+
}),
10598
}, {
10699
"ts": 1686176941446,
107100
"event": "page_load",
108101
"user_id": "user_a",
109-
"properties": {
102+
"properties": json.dumps({
110103
"page_name": "Home"
111-
},
104+
}),
112105
}
113106
]
114107

examples/helpers.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def delete_all_s3(s3c: S3Client):
5151
)
5252
print(f"deleted {len(s3_files)} files")
5353

54-
def get_ice(s3_client, part_func, format_row):
54+
def get_ice(s3_client, part_func):
5555
return IceDBv3(
5656
part_func,
5757
['event', 'ts'], # We are doing to sort by event, then timestamp of the event within the data part
@@ -62,6 +62,5 @@ def get_ice(s3_client, part_func, format_row):
6262
s3_client,
6363
"dan-mbp",
6464
s3_use_path=True, # needed for local minio
65-
format_row=format_row,
6665
compression_codec=CompressionCodec.ZSTD # Let's force a higher compression level, default is SNAPPY
6766
)

examples/materialized-view.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,10 @@ def part_func_mv(row: dict) -> str:
4141
return part
4242

4343

44-
def format_row_raw(row: dict) -> dict:
45-
"""
46-
We can take the row as-is, except let's make the properties a JSON string for safety
47-
"""
48-
row['properties'] = json.dumps(row['properties']) # convert nested dict to json string
49-
return row
50-
51-
52-
def format_row_mv(row: dict) -> dict:
53-
"""
54-
Considering this would be a materialized view on raw incoming data,
55-
we prepare it differently than `simple-full.py`
56-
"""
57-
del row['properties'] # drop properties because we don't need it for this table
58-
row['cnt'] = 1 # seed the incoming columns with the count to sum
59-
return row
60-
61-
6244
# This will be for the raw events
63-
ice_raw = get_ice(s3c_raw, part_func_raw, format_row_raw)
45+
ice_raw = get_ice(s3c_raw, part_func_raw)
6446
# This will be for our materialized view
65-
ice_mv = get_ice(s3c_mv, part_func_mv, format_row_mv)
47+
ice_mv = get_ice(s3c_mv, part_func_mv)
6648
ice_mv.custom_merge_query = """
6749
select sum(cnt)::INT8 as cnt, max(ts) as ts, user_id, event
6850
from source_files
@@ -75,30 +57,34 @@ def format_row_mv(row: dict) -> dict:
7557
"ts": 1686176939445,
7658
"event": "page_load",
7759
"user_id": "user_a",
78-
"properties": {
60+
"cnt": 1, # seed the incoming columns with the count to sum
61+
"properties": json.dumps({
7962
"page_name": "Home"
80-
},
63+
}),
8164
}, {
8265
"ts": 1676126229999,
8366
"event": "page_load",
8467
"user_id": "user_b",
85-
"properties": {
68+
"cnt": 1, # seed the incoming columns with the count to sum
69+
"properties": json.dumps({
8670
"page_name": "Home"
87-
},
71+
}),
8872
}, {
8973
"ts": 1686176939666,
9074
"event": "page_load",
9175
"user_id": "user_a",
92-
"properties": {
76+
"cnt": 1, # seed the incoming columns with the count to sum
77+
"properties": json.dumps({
9378
"page_name": "Settings"
94-
},
79+
}),
9580
}, {
9681
"ts": 1686176941445,
9782
"event": "page_load",
9883
"user_id": "user_a",
99-
"properties": {
84+
"cnt": 1, # seed the incoming columns with the count to sum
85+
"properties": json.dumps({
10086
"page_name": "Home"
101-
},
87+
}),
10288
}
10389
]
10490

examples/simple-full.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,8 @@ def part_func(row: dict) -> str:
2525
return part
2626

2727

28-
def format_row(row: dict) -> dict:
29-
"""
30-
We can take the row as-is, except let's make the properties a JSON string for safety
31-
"""
32-
row['properties'] = json.dumps(row['properties']) # convert nested dict to json string
33-
return row
34-
3528

36-
ice = get_ice(s3c, part_func, format_row)
29+
ice = get_ice(s3c, part_func)
3730

3831
# Some fake events that we are ingesting
3932
example_events = [

examples/verify-schema.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,7 @@ def part_func(row: dict) -> str:
3232
return part
3333

3434

35-
def format_row(row: dict) -> dict:
36-
"""
37-
We can take the row as-is, except let's make the properties a JSON string for safety
38-
"""
39-
row['properties'] = json.dumps(row['properties']) # convert nested dict to json string
40-
return row
41-
42-
43-
ice = get_ice(s3c, part_func, format_row)
35+
ice = get_ice(s3c, part_func)
4436

4537
# Some fake events that we are ingesting
4638
example_events = [

0 commit comments

Comments
 (0)