Skip to content

Commit 4225655

Browse files
committed
refactor: migrate ClientSessionSyncProcessor.materializeEvent to Effect
1 parent 4e7b870 commit 4225655

File tree

4 files changed

+132
-67
lines changed

4 files changed

+132
-67
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ export const makeClientSessionSyncProcessor = ({
5353
materializeEvent: (
5454
eventDecoded: LiveStoreEvent.AnyDecoded,
5555
options: { otelContext: otel.Context; withChangeset: boolean; materializerHashLeader: Option.Option<number> },
56-
) => {
56+
) => Effect.Effect<{
5757
writeTables: Set<string>
5858
sessionChangeset: { _tag: 'sessionChangeset'; data: Uint8Array; debug: any } | { _tag: 'no-op' } | { _tag: 'unset' }
5959
materializerHash: Option.Option<number>
60-
}
60+
}>
6161
rollback: (changeset: Uint8Array) => void
6262
refreshTables: (tables: Set<string>) => void
6363
span: otel.Span
@@ -151,7 +151,7 @@ export const makeClientSessionSyncProcessor = ({
151151
writeTables: newWriteTables,
152152
sessionChangeset,
153153
materializerHash,
154-
} = materializeEvent(decodedEventDef, {
154+
} = yield* materializeEvent(decodedEventDef, {
155155
otelContext,
156156
withChangeset: true,
157157
materializerHashLeader: Option.none(),
@@ -301,7 +301,7 @@ export const makeClientSessionSyncProcessor = ({
301301
writeTables: newWriteTables,
302302
sessionChangeset,
303303
materializerHash,
304-
} = materializeEvent(decodedEventDef, {
304+
} = yield* materializeEvent(decodedEventDef, {
305305
otelContext,
306306
withChangeset: true,
307307
materializerHashLeader: event.meta.materializerHashLeader,

packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ exports[`otel > QueryBuilder subscription - basic functionality 1`] = `
2525
"attributes": {
2626
"batchSize": 1,
2727
},
28+
"children": [
29+
{
30+
"_name": "client-session-sync-processor:materialize-event",
31+
},
32+
],
2833
},
2934
{
3035
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -144,6 +149,11 @@ exports[`otel > QueryBuilder subscription - direct table subscription 1`] = `
144149
"attributes": {
145150
"batchSize": 1,
146151
},
152+
"children": [
153+
{
154+
"_name": "client-session-sync-processor:materialize-event",
155+
},
156+
],
147157
},
148158
{
149159
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -263,12 +273,22 @@ exports[`otel > QueryBuilder subscription - unsubscribe functionality 1`] = `
263273
"attributes": {
264274
"batchSize": 1,
265275
},
276+
"children": [
277+
{
278+
"_name": "client-session-sync-processor:materialize-event",
279+
},
280+
],
266281
},
267282
{
268283
"_name": "client-session-sync-processor:push",
269284
"attributes": {
270285
"batchSize": 1,
271286
},
287+
"children": [
288+
{
289+
"_name": "client-session-sync-processor:materialize-event",
290+
},
291+
],
272292
},
273293
{
274294
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -438,6 +458,11 @@ exports[`otel > otel 3`] = `
438458
"attributes": {
439459
"batchSize": 1,
440460
},
461+
"children": [
462+
{
463+
"_name": "client-session-sync-processor:materialize-event",
464+
},
465+
],
441466
},
442467
{
443468
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -719,6 +744,11 @@ exports[`otel > with thunks 7`] = `
719744
"attributes": {
720745
"batchSize": 1,
721746
},
747+
"children": [
748+
{
749+
"_name": "client-session-sync-processor:materialize-event",
750+
},
751+
],
722752
},
723753
{
724754
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -834,6 +864,11 @@ exports[`otel > with thunks with query builder and without labels 3`] = `
834864
"attributes": {
835865
"batchSize": 1,
836866
},
867+
"children": [
868+
{
869+
"_name": "client-session-sync-processor:materialize-event",
870+
},
871+
],
837872
},
838873
{
839874
"_name": "@livestore/common:LeaderSyncProcessor:push",

packages/@livestore/livestore/src/store/store.ts

Lines changed: 73 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -128,72 +128,80 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
128128
schema,
129129
clientSession,
130130
runtime: effectContext.runtime,
131-
materializeEvent: (eventDecoded, { otelContext, withChangeset, materializerHashLeader }) => {
132-
const { eventDef, materializer } = getEventDef(schema, eventDecoded.name)
133-
134-
const execArgsArr = getExecStatementsFromMaterializer({
135-
eventDef,
136-
materializer,
137-
dbState: this.sqliteDbWrapper,
138-
event: { decoded: eventDecoded, encoded: undefined },
139-
})
140-
141-
const materializerHash = isDevEnv() ? Option.some(hashMaterializerResults(execArgsArr)) : Option.none()
142-
143-
if (
144-
materializerHashLeader._tag === 'Some' &&
145-
materializerHash._tag === 'Some' &&
146-
materializerHashLeader.value !== materializerHash.value
147-
) {
148-
void this.shutdown(
149-
Cause.fail(
150-
UnexpectedError.make({
151-
cause: `Materializer hash mismatch detected for event "${eventDecoded.name}".`,
152-
note: `Please make sure your event materializer is a pure function without side effects.`,
153-
}),
154-
),
155-
)
156-
}
157-
158-
const writeTablesForEvent = new Set<string>()
159-
160-
const exec = () => {
161-
for (const {
162-
statementSql,
163-
bindValues,
164-
writeTables = this.sqliteDbWrapper.getTablesUsed(statementSql),
165-
} of execArgsArr) {
166-
try {
167-
this.sqliteDbWrapper.cachedExecute(statementSql, bindValues, { otelContext, writeTables })
168-
} catch (cause) {
169-
throw UnexpectedError.make({
170-
cause,
171-
note: `Error executing materializer for event "${eventDecoded.name}".\nStatement: ${statementSql}\nBind values: ${JSON.stringify(bindValues)}`,
172-
})
173-
}
174-
175-
// durationMsTotal += durationMs
176-
for (const table of writeTables) {
177-
writeTablesForEvent.add(table)
131+
materializeEvent: Effect.fn('client-session-sync-processor:materialize-event')(
132+
(eventDecoded, { otelContext, withChangeset, materializerHashLeader }) =>
133+
Effect.gen(this, function* () {
134+
const { eventDef, materializer } = getEventDef(schema, eventDecoded.name)
135+
136+
const execArgsArr = getExecStatementsFromMaterializer({
137+
eventDef,
138+
materializer,
139+
dbState: this.sqliteDbWrapper,
140+
event: { decoded: eventDecoded, encoded: undefined },
141+
})
142+
143+
const materializerHash = isDevEnv() ? Option.some(hashMaterializerResults(execArgsArr)) : Option.none()
144+
145+
if (
146+
materializerHashLeader._tag === 'Some' &&
147+
materializerHash._tag === 'Some' &&
148+
materializerHashLeader.value !== materializerHash.value
149+
) {
150+
// Fork the shutdown effect to run in the background as a daemon, ensuring it's not interrupted.
151+
// TODO: we should probably handle this more gracefully using Effect’s error channel
152+
yield* Effect.forkDaemon(
153+
this.shutdown(
154+
Cause.fail(
155+
UnexpectedError.make({
156+
cause: `Materializer hash mismatch detected for event "${eventDecoded.name}".`,
157+
note: `Please make sure your event materializer is a pure function without side effects.`,
158+
}),
159+
),
160+
),
161+
)
178162
}
179163

180-
this.sqliteDbWrapper.debug.head = eventDecoded.seqNum
181-
}
182-
}
183-
184-
let sessionChangeset:
185-
| { _tag: 'sessionChangeset'; data: Uint8Array; debug: any }
186-
| { _tag: 'no-op' }
187-
| { _tag: 'unset' } = { _tag: 'unset' }
164+
return yield* Effect.sync(() => {
165+
const writeTablesForEvent = new Set<string>()
166+
167+
const exec = () => {
168+
for (const {
169+
statementSql,
170+
bindValues,
171+
writeTables = this.sqliteDbWrapper.getTablesUsed(statementSql),
172+
} of execArgsArr) {
173+
try {
174+
this.sqliteDbWrapper.cachedExecute(statementSql, bindValues, { otelContext, writeTables })
175+
} catch (cause) {
176+
throw UnexpectedError.make({
177+
cause,
178+
note: `Error executing materializer for event "${eventDecoded.name}".\nStatement: ${statementSql}\nBind values: ${JSON.stringify(bindValues)}`,
179+
})
180+
}
181+
182+
// durationMsTotal += durationMs
183+
for (const table of writeTables) {
184+
writeTablesForEvent.add(table)
185+
}
186+
187+
this.sqliteDbWrapper.debug.head = eventDecoded.seqNum
188+
}
189+
}
188190

189-
if (withChangeset === true) {
190-
sessionChangeset = this.sqliteDbWrapper.withChangeset(exec).changeset
191-
} else {
192-
exec()
193-
}
191+
let sessionChangeset:
192+
| { _tag: 'sessionChangeset'; data: Uint8Array; debug: any }
193+
| { _tag: 'no-op' }
194+
| { _tag: 'unset' } = { _tag: 'unset' }
195+
if (withChangeset === true) {
196+
sessionChangeset = this.sqliteDbWrapper.withChangeset(exec).changeset
197+
} else {
198+
exec()
199+
}
194200

195-
return { writeTables: writeTablesForEvent, sessionChangeset, materializerHash }
196-
},
201+
return { writeTables: writeTablesForEvent, sessionChangeset, materializerHash }
202+
})
203+
}),
204+
),
197205
rollback: (changeset) => {
198206
this.sqliteDbWrapper.rollback(changeset)
199207
},
@@ -749,7 +757,9 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
749757
* This is called automatically when the store was created using the React or Effect API.
750758
*/
751759
shutdown = (cause?: Cause.Cause<UnexpectedError>): Effect.Effect<void> => {
752-
this.checkShutdown('shutdown')
760+
if (this.isShutdown) {
761+
return Effect.void
762+
}
753763

754764
this.isShutdown = true
755765
return this.clientSession.shutdown(

packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,22 @@ exports[`useClientDocument > otel > should update the data based on component ke
2525
"attributes": {
2626
"batchSize": 1,
2727
},
28+
"children": [
29+
{
30+
"_name": "client-session-sync-processor:materialize-event",
31+
},
32+
],
2833
},
2934
{
3035
"_name": "client-session-sync-processor:push",
3136
"attributes": {
3237
"batchSize": 1,
3338
},
39+
"children": [
40+
{
41+
"_name": "client-session-sync-processor:materialize-event",
42+
},
43+
],
3444
},
3545
{
3646
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -263,12 +273,22 @@ exports[`useClientDocument > otel > should update the data based on component ke
263273
"attributes": {
264274
"batchSize": 1,
265275
},
276+
"children": [
277+
{
278+
"_name": "client-session-sync-processor:materialize-event",
279+
},
280+
],
266281
},
267282
{
268283
"_name": "client-session-sync-processor:push",
269284
"attributes": {
270285
"batchSize": 1,
271286
},
287+
"children": [
288+
{
289+
"_name": "client-session-sync-processor:materialize-event",
290+
},
291+
],
272292
},
273293
{
274294
"_name": "@livestore/common:LeaderSyncProcessor:push",

0 commit comments

Comments
 (0)