diff --git a/packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts b/packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts index ac8d30c37..3ac1c5bb4 100644 --- a/packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts +++ b/packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts @@ -13,7 +13,7 @@ import { Stream, Subscribable, } from '@livestore/utils/effect' -import * as otel from '@opentelemetry/api' +import type * as otel from '@opentelemetry/api' import { type ClientSession, SyncError, type UnexpectedError } from '../adapter-types.ts' import * as EventSequenceNumber from '../schema/EventSequenceNumber.ts' @@ -52,12 +52,12 @@ export const makeClientSessionSyncProcessor = ({ runtime: Runtime.Runtime materializeEvent: ( eventDecoded: LiveStoreEvent.AnyDecoded, - options: { otelContext: otel.Context; withChangeset: boolean; materializerHashLeader: Option.Option }, - ) => { + options: { withChangeset: boolean; materializerHashLeader: Option.Option }, + ) => Effect.Effect<{ writeTables: Set sessionChangeset: { _tag: 'sessionChangeset'; data: Uint8Array; debug: any } | { _tag: 'no-op' } | { _tag: 'unset' } materializerHash: Option.Option - } + }> rollback: (changeset: Uint8Array) => void refreshTables: (tables: Set) => void span: otel.Span @@ -96,10 +96,7 @@ export const makeClientSessionSyncProcessor = ({ /** We're queuing push requests to reduce the number of messages sent to the leader by batching them */ const leaderPushQueue = BucketQueue.make().pipe(Effect.runSync) - const push: ClientSessionSyncProcessor['push'] = Effect.fn('client-session-sync-processor:push')(function* ( - batch, - { otelContext }, - ) { + const push: ClientSessionSyncProcessor['push'] = Effect.fn('client-session-sync-processor:push')(function* (batch) { // TODO validate batch let baseEventSequenceNumber = syncStateRef.current.localHead @@ -151,8 +148,7 @@ export const makeClientSessionSyncProcessor = ({ writeTables: newWriteTables, sessionChangeset, materializerHash, - } = materializeEvent(decodedEventDef, { - otelContext, + } = yield* materializeEvent(decodedEventDef, { withChangeset: true, materializerHashLeader: Option.none(), }) @@ -176,8 +172,6 @@ export const makeClientSessionSyncProcessor = ({ rejectCount: 0, } - const otelContext = otel.trace.setSpan(otel.context.active(), span) - const boot: ClientSessionSyncProcessor['boot'] = Effect.gen(function* () { if (confirmUnsavedChanges && typeof window !== 'undefined' && typeof window.addEventListener === 'function') { const onBeforeUnload = (event: BeforeUnloadEvent) => { @@ -301,8 +295,7 @@ export const makeClientSessionSyncProcessor = ({ writeTables: newWriteTables, sessionChangeset, materializerHash, - } = materializeEvent(decodedEventDef, { - otelContext, + } = yield* materializeEvent(decodedEventDef, { withChangeset: true, materializerHashLeader: event.meta.materializerHashLeader, }) @@ -359,10 +352,7 @@ export const makeClientSessionSyncProcessor = ({ } export interface ClientSessionSyncProcessor { - push: ( - batch: ReadonlyArray, - options: { otelContext: otel.Context }, - ) => Effect.Effect< + push: (batch: ReadonlyArray) => Effect.Effect< { writeTables: Set }, diff --git a/packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap b/packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap index 69df5d85c..c20e14ddb 100644 --- a/packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap +++ b/packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap @@ -25,6 +25,19 @@ exports[`otel > QueryBuilder subscription - basic functionality 1`] = ` "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -55,14 +68,6 @@ exports[`otel > QueryBuilder subscription - basic functionality 1`] = ` ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", - }, - }, - ], }, ], }, @@ -144,6 +149,19 @@ exports[`otel > QueryBuilder subscription - direct table subscription 1`] = ` "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -174,14 +192,6 @@ exports[`otel > QueryBuilder subscription - direct table subscription 1`] = ` ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", - }, - }, - ], }, ], }, @@ -263,12 +273,38 @@ exports[`otel > QueryBuilder subscription - unsubscribe functionality 1`] = ` "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", + }, + }, + ], + }, + ], }, { "_name": "client-session-sync-processor:push", "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -306,14 +342,6 @@ exports[`otel > QueryBuilder subscription - unsubscribe functionality 1`] = ` ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", - }, - }, - ], }, { "_name": "LiveStore:commit", @@ -323,14 +351,6 @@ exports[`otel > QueryBuilder subscription - unsubscribe functionality 1`] = ` ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", - }, - }, - ], }, ], }, @@ -438,6 +458,19 @@ exports[`otel > otel 3`] = ` "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -468,14 +501,6 @@ exports[`otel > otel 3`] = ` ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", - }, - }, - ], }, ], }, @@ -719,6 +744,19 @@ exports[`otel > with thunks 7`] = ` "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -749,14 +787,6 @@ exports[`otel > with thunks 7`] = ` ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", - }, - }, - ], }, ], }, @@ -834,6 +864,19 @@ exports[`otel > with thunks with query builder and without labels 3`] = ` "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -864,14 +907,6 @@ exports[`otel > with thunks with query builder and without labels 3`] = ` ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": "INSERT INTO 'todos' (id, text, completed) VALUES (?, ?, ?)", - }, - }, - ], }, ], }, diff --git a/packages/@livestore/livestore/src/store/store.ts b/packages/@livestore/livestore/src/store/store.ts index 29415b34f..e8cea14d6 100644 --- a/packages/@livestore/livestore/src/store/store.ts +++ b/packages/@livestore/livestore/src/store/store.ts @@ -128,72 +128,81 @@ export class Store { - const { eventDef, materializer } = getEventDef(schema, eventDecoded.name) - - const execArgsArr = getExecStatementsFromMaterializer({ - eventDef, - materializer, - dbState: this.sqliteDbWrapper, - event: { decoded: eventDecoded, encoded: undefined }, - }) - - const materializerHash = isDevEnv() ? Option.some(hashMaterializerResults(execArgsArr)) : Option.none() - - if ( - materializerHashLeader._tag === 'Some' && - materializerHash._tag === 'Some' && - materializerHashLeader.value !== materializerHash.value - ) { - void this.shutdown( - Cause.fail( - UnexpectedError.make({ - cause: `Materializer hash mismatch detected for event "${eventDecoded.name}".`, - note: `Please make sure your event materializer is a pure function without side effects.`, - }), - ), - ) - } - - const writeTablesForEvent = new Set() - - const exec = () => { - for (const { - statementSql, - bindValues, - writeTables = this.sqliteDbWrapper.getTablesUsed(statementSql), - } of execArgsArr) { - try { - this.sqliteDbWrapper.cachedExecute(statementSql, bindValues, { otelContext, writeTables }) - } catch (cause) { - throw UnexpectedError.make({ - cause, - note: `Error executing materializer for event "${eventDecoded.name}".\nStatement: ${statementSql}\nBind values: ${JSON.stringify(bindValues)}`, - }) + materializeEvent: Effect.fn('client-session-sync-processor:materialize-event')( + (eventDecoded, { withChangeset, materializerHashLeader }) => + Effect.gen(this, function* () { + const { eventDef, materializer } = getEventDef(schema, eventDecoded.name) + + const execArgsArr = getExecStatementsFromMaterializer({ + eventDef, + materializer, + dbState: this.sqliteDbWrapper, + event: { decoded: eventDecoded, encoded: undefined }, + }) + + const materializerHash = isDevEnv() ? Option.some(hashMaterializerResults(execArgsArr)) : Option.none() + + if ( + materializerHashLeader._tag === 'Some' && + materializerHash._tag === 'Some' && + materializerHashLeader.value !== materializerHash.value + ) { + // Fork the shutdown effect to run in the background as a daemon, ensuring it's not interrupted. + // TODO: we should probably handle this more gracefully using Effect’s error channel + yield* Effect.forkDaemon( + this.shutdown( + Cause.fail( + UnexpectedError.make({ + cause: `Materializer hash mismatch detected for event "${eventDecoded.name}".`, + note: `Please make sure your event materializer is a pure function without side effects.`, + }), + ), + ), + ) } - // durationMsTotal += durationMs - for (const table of writeTables) { - writeTablesForEvent.add(table) + const span = yield* OtelTracer.currentOtelSpan.pipe(Effect.orDie) + const otelContext = otel.trace.setSpan(otel.context.active(), span) + + const writeTablesForEvent = new Set() + + const exec = () => { + for (const { + statementSql, + bindValues, + writeTables = this.sqliteDbWrapper.getTablesUsed(statementSql), + } of execArgsArr) { + try { + this.sqliteDbWrapper.cachedExecute(statementSql, bindValues, { otelContext, writeTables }) + } catch (cause) { + throw UnexpectedError.make({ + cause, + note: `Error executing materializer for event "${eventDecoded.name}".\nStatement: ${statementSql}\nBind values: ${JSON.stringify(bindValues)}`, + }) + } + + // durationMsTotal += durationMs + for (const table of writeTables) { + writeTablesForEvent.add(table) + } + + this.sqliteDbWrapper.debug.head = eventDecoded.seqNum + } } - this.sqliteDbWrapper.debug.head = eventDecoded.seqNum - } - } - - let sessionChangeset: - | { _tag: 'sessionChangeset'; data: Uint8Array; debug: any } - | { _tag: 'no-op' } - | { _tag: 'unset' } = { _tag: 'unset' } - - if (withChangeset === true) { - sessionChangeset = this.sqliteDbWrapper.withChangeset(exec).changeset - } else { - exec() - } + let sessionChangeset: + | { _tag: 'sessionChangeset'; data: Uint8Array; debug: any } + | { _tag: 'no-op' } + | { _tag: 'unset' } = { _tag: 'unset' } + if (withChangeset === true) { + sessionChangeset = this.sqliteDbWrapper.withChangeset(exec).changeset + } else { + exec() + } - return { writeTables: writeTablesForEvent, sessionChangeset, materializerHash } - }, + return { writeTables: writeTablesForEvent, sessionChangeset, materializerHash } + }), + ), rollback: (changeset) => { this.sqliteDbWrapper.rollback(changeset) }, @@ -633,7 +642,7 @@ export class Store { try { const materializeEvents = () => { - return Runtime.runSync(this.effectContext.runtime, this.syncProcessor.push(events, { otelContext })) + return Runtime.runSync(this.effectContext.runtime, this.syncProcessor.push(events)) } if (events.length > 1) { @@ -749,7 +758,9 @@ export class Store): Effect.Effect => { - this.checkShutdown('shutdown') + if (this.isShutdown) { + return Effect.void + } this.isShutdown = true return this.clientSession.shutdown( diff --git a/packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap b/packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap index 7d62734e0..d3cfbcba4 100644 --- a/packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap +++ b/packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap @@ -25,12 +25,46 @@ exports[`useClientDocument > otel > should update the data based on component ke "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": " + INSERT INTO 'UserInfo' (id, value) + VALUES (?, ?) + ON CONFLICT (id) DO UPDATE SET value = json_set(json_set(value, ?, json(?)), ?, json(?)) + ", + }, + }, + ], + }, + ], }, { "_name": "client-session-sync-processor:push", "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": " + INSERT INTO 'UserInfo' (id, value) + VALUES (?, ?) + ON CONFLICT (id) DO UPDATE SET value = json_set(value, ?, json(?)) + ", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -68,18 +102,6 @@ exports[`useClientDocument > otel > should update the data based on component ke ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": " - INSERT INTO 'UserInfo' (id, value) - VALUES (?, ?) - ON CONFLICT (id) DO UPDATE SET value = json_set(value, ?, json(?)) - ", - }, - }, - ], }, ], }, @@ -110,18 +132,6 @@ exports[`useClientDocument > otel > should update the data based on component ke ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": " - INSERT INTO 'UserInfo' (id, value) - VALUES (?, ?) - ON CONFLICT (id) DO UPDATE SET value = json_set(json_set(value, ?, json(?)), ?, json(?)) - ", - }, - }, - ], }, { "_name": "db:SELECT * FROM 'UserInfo' WHERE id = ?", @@ -263,12 +273,46 @@ exports[`useClientDocument > otel > should update the data based on component ke "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": " + INSERT INTO 'UserInfo' (id, value) + VALUES (?, ?) + ON CONFLICT (id) DO UPDATE SET value = json_set(json_set(value, ?, json(?)), ?, json(?)) + ", + }, + }, + ], + }, + ], }, { "_name": "client-session-sync-processor:push", "attributes": { "batchSize": 1, }, + "children": [ + { + "_name": "client-session-sync-processor:materialize-event", + "children": [ + { + "_name": "livestore.in-memory-db:execute", + "attributes": { + "sql.query": " + INSERT INTO 'UserInfo' (id, value) + VALUES (?, ?) + ON CONFLICT (id) DO UPDATE SET value = json_set(value, ?, json(?)) + ", + }, + }, + ], + }, + ], }, { "_name": "@livestore/common:LeaderSyncProcessor:push", @@ -306,18 +350,6 @@ exports[`useClientDocument > otel > should update the data based on component ke ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": " - INSERT INTO 'UserInfo' (id, value) - VALUES (?, ?) - ON CONFLICT (id) DO UPDATE SET value = json_set(value, ?, json(?)) - ", - }, - }, - ], }, ], }, @@ -348,18 +380,6 @@ exports[`useClientDocument > otel > should update the data based on component ke ], "livestore.eventsCount": 1, }, - "children": [ - { - "_name": "livestore.in-memory-db:execute", - "attributes": { - "sql.query": " - INSERT INTO 'UserInfo' (id, value) - VALUES (?, ?) - ON CONFLICT (id) DO UPDATE SET value = json_set(json_set(value, ?, json(?)), ?, json(?)) - ", - }, - }, - ], }, { "_name": "db:SELECT * FROM 'UserInfo' WHERE id = ?",