Skip to content

Commit b78702e

Browse files
committed
refactor: migrate ClientSessionSyncProcessor.materializeEvent to Effect
1 parent 1c517d0 commit b78702e

File tree

4 files changed

+109
-66
lines changed

4 files changed

+109
-66
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ export const makeClientSessionSyncProcessor = ({
5353
materializeEvent: (
5454
eventDecoded: LiveStoreEvent.AnyDecoded,
5555
options: { otelContext: otel.Context; withChangeset: boolean; materializerHashLeader: Option.Option<number> },
56-
) => {
56+
) => Effect.Effect<{
5757
writeTables: Set<string>
5858
sessionChangeset: { _tag: 'sessionChangeset'; data: Uint8Array; debug: any } | { _tag: 'no-op' } | { _tag: 'unset' }
5959
materializerHash: Option.Option<number>
60-
}
60+
}>
6161
rollback: (changeset: Uint8Array) => void
6262
refreshTables: (tables: Set<string>) => void
6363
span: otel.Span
@@ -153,7 +153,7 @@ export const makeClientSessionSyncProcessor = ({
153153
writeTables: newWriteTables,
154154
sessionChangeset,
155155
materializerHash,
156-
} = materializeEvent(decodedEventDef, {
156+
} = yield* materializeEvent(decodedEventDef, {
157157
otelContext,
158158
withChangeset: true,
159159
materializerHashLeader: Option.none(),
@@ -305,7 +305,7 @@ export const makeClientSessionSyncProcessor = ({
305305
writeTables: newWriteTables,
306306
sessionChangeset,
307307
materializerHash,
308-
} = materializeEvent(decodedEventDef, {
308+
} = yield* materializeEvent(decodedEventDef, {
309309
otelContext,
310310
withChangeset: true,
311311
materializerHashLeader: event.meta.materializerHashLeader,

packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,11 @@ exports[`otel > otel 3`] = `
438438
"attributes": {
439439
"batchSize": 1,
440440
},
441+
"children": [
442+
{
443+
"_name": "client-session-sync-processor:materialize-event",
444+
},
445+
],
441446
},
442447
{
443448
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -719,6 +724,11 @@ exports[`otel > with thunks 7`] = `
719724
"attributes": {
720725
"batchSize": 1,
721726
},
727+
"children": [
728+
{
729+
"_name": "client-session-sync-processor:materialize-event",
730+
},
731+
],
722732
},
723733
{
724734
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -834,6 +844,11 @@ exports[`otel > with thunks with query builder and without labels 3`] = `
834844
"attributes": {
835845
"batchSize": 1,
836846
},
847+
"children": [
848+
{
849+
"_name": "client-session-sync-processor:materialize-event",
850+
},
851+
],
837852
},
838853
{
839854
"_name": "@livestore/common:LeaderSyncProcessor:push",

packages/@livestore/livestore/src/store/store.ts

Lines changed: 70 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -128,72 +128,80 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
128128
schema,
129129
clientSession,
130130
runtime: effectContext.runtime,
131-
materializeEvent: (eventDecoded, { otelContext, withChangeset, materializerHashLeader }) => {
132-
const { eventDef, materializer } = getEventDef(schema, eventDecoded.name)
133-
134-
const execArgsArr = getExecStatementsFromMaterializer({
135-
eventDef,
136-
materializer,
137-
dbState: this.sqliteDbWrapper,
138-
event: { decoded: eventDecoded, encoded: undefined },
139-
})
140-
141-
const materializerHash = isDevEnv() ? Option.some(hashMaterializerResults(execArgsArr)) : Option.none()
142-
143-
if (
144-
materializerHashLeader._tag === 'Some' &&
145-
materializerHash._tag === 'Some' &&
146-
materializerHashLeader.value !== materializerHash.value
147-
) {
148-
void this.shutdown(
149-
Cause.fail(
150-
UnexpectedError.make({
151-
cause: `Materializer hash mismatch detected for event "${eventDecoded.name}".`,
152-
note: `Please make sure your event materializer is a pure function without side effects.`,
153-
}),
154-
),
155-
)
156-
}
157-
158-
const writeTablesForEvent = new Set<string>()
159-
160-
const exec = () => {
161-
for (const {
162-
statementSql,
163-
bindValues,
164-
writeTables = this.sqliteDbWrapper.getTablesUsed(statementSql),
165-
} of execArgsArr) {
166-
try {
167-
this.sqliteDbWrapper.cachedExecute(statementSql, bindValues, { otelContext, writeTables })
168-
} catch (cause) {
169-
throw UnexpectedError.make({
170-
cause,
171-
note: `Error executing materializer for event "${eventDecoded.name}".\nStatement: ${statementSql}\nBind values: ${JSON.stringify(bindValues)}`,
172-
})
173-
}
174-
175-
// durationMsTotal += durationMs
176-
for (const table of writeTables) {
177-
writeTablesForEvent.add(table)
131+
materializeEvent: Effect.fn('client-session-sync-processor:materialize-event')(
132+
(eventDecoded, { otelContext, withChangeset, materializerHashLeader }) =>
133+
Effect.gen(this, function* () {
134+
const { eventDef, materializer } = getEventDef(schema, eventDecoded.name)
135+
136+
const execArgsArr = getExecStatementsFromMaterializer({
137+
eventDef,
138+
materializer,
139+
dbState: this.sqliteDbWrapper,
140+
event: { decoded: eventDecoded, encoded: undefined },
141+
})
142+
143+
const materializerHash = isDevEnv() ? Option.some(hashMaterializerResults(execArgsArr)) : Option.none()
144+
145+
if (
146+
materializerHashLeader._tag === 'Some' &&
147+
materializerHash._tag === 'Some' &&
148+
materializerHashLeader.value !== materializerHash.value
149+
) {
150+
// Fork the shutdown effect to run in the background as a daemon, ensuring it's not interrupted.
151+
// TODO: we should probably handle this more gracefully using Effect’s error channel
152+
yield* Effect.forkDaemon(
153+
this.shutdown(
154+
Cause.fail(
155+
UnexpectedError.make({
156+
cause: `Materializer hash mismatch detected for event "${eventDecoded.name}".`,
157+
note: `Please make sure your event materializer is a pure function without side effects.`,
158+
}),
159+
),
160+
),
161+
)
178162
}
179163

180-
this.sqliteDbWrapper.debug.head = eventDecoded.seqNum
181-
}
182-
}
183-
184-
let sessionChangeset:
185-
| { _tag: 'sessionChangeset'; data: Uint8Array; debug: any }
186-
| { _tag: 'no-op' }
187-
| { _tag: 'unset' } = { _tag: 'unset' }
164+
return yield* Effect.sync(() => {
165+
const writeTablesForEvent = new Set<string>()
166+
167+
const exec = () => {
168+
for (const {
169+
statementSql,
170+
bindValues,
171+
writeTables = this.sqliteDbWrapper.getTablesUsed(statementSql),
172+
} of execArgsArr) {
173+
try {
174+
this.sqliteDbWrapper.cachedExecute(statementSql, bindValues, { otelContext, writeTables })
175+
} catch (cause) {
176+
throw UnexpectedError.make({
177+
cause,
178+
note: `Error executing materializer for event "${eventDecoded.name}".\nStatement: ${statementSql}\nBind values: ${JSON.stringify(bindValues)}`,
179+
})
180+
}
181+
182+
// durationMsTotal += durationMs
183+
for (const table of writeTables) {
184+
writeTablesForEvent.add(table)
185+
}
186+
187+
this.sqliteDbWrapper.debug.head = eventDecoded.seqNum
188+
}
189+
}
188190

189-
if (withChangeset === true) {
190-
sessionChangeset = this.sqliteDbWrapper.withChangeset(exec).changeset
191-
} else {
192-
exec()
193-
}
191+
let sessionChangeset:
192+
| { _tag: 'sessionChangeset'; data: Uint8Array; debug: any }
193+
| { _tag: 'no-op' }
194+
| { _tag: 'unset' } = { _tag: 'unset' }
195+
if (withChangeset === true) {
196+
sessionChangeset = this.sqliteDbWrapper.withChangeset(exec).changeset
197+
} else {
198+
exec()
199+
}
194200

195-
return { writeTables: writeTablesForEvent, sessionChangeset, materializerHash }
196-
},
201+
return { writeTables: writeTablesForEvent, sessionChangeset, materializerHash }
202+
})
203+
}),
204+
),
197205
rollback: (changeset) => {
198206
this.sqliteDbWrapper.rollback(changeset)
199207
},

packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,22 @@ exports[`useClientDocument > otel > should update the data based on component ke
2525
"attributes": {
2626
"batchSize": 1,
2727
},
28+
"children": [
29+
{
30+
"_name": "client-session-sync-processor:materialize-event",
31+
},
32+
],
2833
},
2934
{
3035
"_name": "client-session-sync-processor:push",
3136
"attributes": {
3237
"batchSize": 1,
3338
},
39+
"children": [
40+
{
41+
"_name": "client-session-sync-processor:materialize-event",
42+
},
43+
],
3444
},
3545
{
3646
"_name": "@livestore/common:LeaderSyncProcessor:push",
@@ -263,12 +273,22 @@ exports[`useClientDocument > otel > should update the data based on component ke
263273
"attributes": {
264274
"batchSize": 1,
265275
},
276+
"children": [
277+
{
278+
"_name": "client-session-sync-processor:materialize-event",
279+
},
280+
],
266281
},
267282
{
268283
"_name": "client-session-sync-processor:push",
269284
"attributes": {
270285
"batchSize": 1,
271286
},
287+
"children": [
288+
{
289+
"_name": "client-session-sync-processor:materialize-event",
290+
},
291+
],
272292
},
273293
{
274294
"_name": "@livestore/common:LeaderSyncProcessor:push",

0 commit comments

Comments
 (0)