@@ -128,72 +128,77 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
128
128
schema,
129
129
clientSession,
130
130
runtime : effectContext . runtime ,
131
- materializeEvent : ( eventDecoded , { otelContext , withChangeset , materializerHashLeader } ) => {
132
- const { eventDef , materializer } = getEventDef ( schema , eventDecoded . name )
133
-
134
- const execArgsArr = getExecStatementsFromMaterializer ( {
135
- eventDef ,
136
- materializer ,
137
- dbState : this . sqliteDbWrapper ,
138
- event : { decoded : eventDecoded , encoded : undefined } ,
139
- } )
140
-
141
- const materializerHash = isDevEnv ( ) ? Option . some ( hashMaterializerResults ( execArgsArr ) ) : Option . none ( )
142
-
143
- if (
144
- materializerHashLeader . _tag === 'Some' &&
145
- materializerHash . _tag === 'Some' &&
146
- materializerHashLeader . value !== materializerHash . value
147
- ) {
148
- void this . shutdown (
149
- Cause . fail (
150
- UnexpectedError . make ( {
131
+ materializeEvent : Effect . fn ( 'client-session-sync-processor:materialize-event' ) (
132
+ ( eventDecoded , { otelContext , withChangeset , materializerHashLeader } ) =>
133
+ Effect . gen ( this , function * ( ) {
134
+ const { eventDef , materializer } = getEventDef ( schema , eventDecoded . name )
135
+
136
+ const execArgsArr = getExecStatementsFromMaterializer ( {
137
+ eventDef ,
138
+ materializer ,
139
+ dbState : this . sqliteDbWrapper ,
140
+ event : { decoded : eventDecoded , encoded : undefined } ,
141
+ } )
142
+
143
+ const materializerHash = isDevEnv ( ) ? Option . some ( hashMaterializerResults ( execArgsArr ) ) : Option . none ( )
144
+
145
+ if (
146
+ materializerHashLeader . _tag === 'Some' &&
147
+ materializerHash . _tag === 'Some' &&
148
+ materializerHashLeader . value !== materializerHash . value
149
+ ) {
150
+ const error = UnexpectedError . make ( {
151
151
cause : `Materializer hash mismatch detected for event "${ eventDecoded . name } ".` ,
152
152
note : `Please make sure your event materializer is a pure function without side effects.` ,
153
- } ) ,
154
- ) ,
155
- )
156
- }
157
-
158
- const writeTablesForEvent = new Set < string > ( )
159
-
160
- const exec = ( ) => {
161
- for ( const {
162
- statementSql,
163
- bindValues,
164
- writeTables = this . sqliteDbWrapper . getTablesUsed ( statementSql ) ,
165
- } of execArgsArr ) {
166
- try {
167
- this . sqliteDbWrapper . cachedExecute ( statementSql , bindValues , { otelContext, writeTables } )
168
- } catch ( cause ) {
169
- throw UnexpectedError . make ( {
170
- cause,
171
- note : `Error executing materializer for event "${ eventDecoded . name } ".\nStatement: ${ statementSql } \nBind values: ${ JSON . stringify ( bindValues ) } ` ,
172
153
} )
173
- }
174
154
175
- // durationMsTotal += durationMs
176
- for ( const table of writeTables ) {
177
- writeTablesForEvent . add ( table )
178
- }
155
+ // Fork the shutdown effect to run in the background as a daemon, ensuring it's not interrupted.
156
+ yield * Effect . forkDaemon ( this . shutdown ( Cause . fail ( error ) ) )
179
157
180
- this . sqliteDbWrapper . debug . head = eventDecoded . seqNum
181
- }
182
- }
158
+ // TODO: we should probably handle this more gracefully using Effect’s error channel
159
+ }
183
160
184
- let sessionChangeset :
185
- | { _tag : 'sessionChangeset' ; data : Uint8Array ; debug : any }
186
- | { _tag : 'no-op' }
187
- | { _tag : 'unset' } = { _tag : 'unset' }
161
+ return yield * Effect . sync ( ( ) => {
162
+ const writeTablesForEvent = new Set < string > ( )
163
+
164
+ const exec = ( ) => {
165
+ for ( const {
166
+ statementSql,
167
+ bindValues,
168
+ writeTables = this . sqliteDbWrapper . getTablesUsed ( statementSql ) ,
169
+ } of execArgsArr ) {
170
+ try {
171
+ this . sqliteDbWrapper . cachedExecute ( statementSql , bindValues , { otelContext, writeTables } )
172
+ } catch ( cause ) {
173
+ throw UnexpectedError . make ( {
174
+ cause,
175
+ note : `Error executing materializer for event "${ eventDecoded . name } ".\nStatement: ${ statementSql } \nBind values: ${ JSON . stringify ( bindValues ) } ` ,
176
+ } )
177
+ }
178
+
179
+ // durationMsTotal += durationMs
180
+ for ( const table of writeTables ) {
181
+ writeTablesForEvent . add ( table )
182
+ }
183
+
184
+ this . sqliteDbWrapper . debug . head = eventDecoded . seqNum
185
+ }
186
+ }
188
187
189
- if ( withChangeset === true ) {
190
- sessionChangeset = this . sqliteDbWrapper . withChangeset ( exec ) . changeset
191
- } else {
192
- exec ( )
193
- }
188
+ let sessionChangeset :
189
+ | { _tag : 'sessionChangeset' ; data : Uint8Array ; debug : any }
190
+ | { _tag : 'no-op' }
191
+ | { _tag : 'unset' } = { _tag : 'unset' }
192
+ if ( withChangeset === true ) {
193
+ sessionChangeset = this . sqliteDbWrapper . withChangeset ( exec ) . changeset
194
+ } else {
195
+ exec ( )
196
+ }
194
197
195
- return { writeTables : writeTablesForEvent , sessionChangeset, materializerHash }
196
- } ,
198
+ return { writeTables : writeTablesForEvent , sessionChangeset, materializerHash }
199
+ } )
200
+ } ) ,
201
+ ) ,
197
202
rollback : ( changeset ) => {
198
203
this . sqliteDbWrapper . rollback ( changeset )
199
204
} ,
0 commit comments