@@ -115,70 +115,69 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema, TContext =
115
115
schema,
116
116
clientSession,
117
117
runtime : effectContext . runtime ,
118
- materializeEvent : ( eventDecoded , { otelContext , withChangeset , materializerHashLeader } ) => {
119
- const { eventDef , materializer } = getEventDef ( schema , eventDecoded . name )
120
-
121
- const execArgsArr = getExecStatementsFromMaterializer ( {
122
- eventDef ,
123
- materializer ,
124
- dbState : this . sqliteDbWrapper ,
125
- event : { decoded : eventDecoded , encoded : undefined } ,
126
- } )
127
-
128
- const materializerHash = isDevEnv ( ) ? Option . some ( hashMaterializerResults ( execArgsArr ) ) : Option . none ( )
129
-
130
- if (
131
- materializerHashLeader . _tag === 'Some' &&
132
- materializerHash . _tag === 'Some' &&
133
- materializerHashLeader . value !== materializerHash . value
134
- ) {
135
- void this . shutdown (
136
- Cause . fail (
137
- UnexpectedError . make ( {
118
+ materializeEvent : Effect . fn ( 'client-session-sync-processor:materialize-event' ) (
119
+ ( eventDecoded , { otelContext , withChangeset , materializerHashLeader } ) =>
120
+ Effect . gen ( this , function * ( ) {
121
+ const { eventDef , materializer } = getEventDef ( schema , eventDecoded . name )
122
+
123
+ const execArgsArr = getExecStatementsFromMaterializer ( {
124
+ eventDef ,
125
+ materializer ,
126
+ dbState : this . sqliteDbWrapper ,
127
+ event : { decoded : eventDecoded , encoded : undefined } ,
128
+ } )
129
+
130
+ const materializerHash = isDevEnv ( ) ? Option . some ( hashMaterializerResults ( execArgsArr ) ) : Option . none ( )
131
+
132
+ if (
133
+ materializerHashLeader . _tag === 'Some' &&
134
+ materializerHash . _tag === 'Some' &&
135
+ materializerHashLeader . value !== materializerHash . value
136
+ ) {
137
+ const error = UnexpectedError . make ( {
138
138
cause : `Materializer hash mismatch detected for event "${ eventDecoded . name } ".` ,
139
139
note : `Please make sure your event materializer is a pure function without side effects.` ,
140
- } ) ,
141
- ) ,
142
- )
143
- }
140
+ } )
144
141
145
- const writeTablesForEvent = new Set < string > ( )
142
+ // Fork the shutdown effect to run in the background as a daemon,
143
+ // ensuring it's not interrupted.
144
+ yield * Effect . forkDaemon ( this . clientSession . shutdown ( Cause . fail ( error ) ) )
146
145
147
- const exec = ( ) => {
148
- for ( const {
149
- statementSql,
150
- bindValues,
151
- writeTables = this . sqliteDbWrapper . getTablesUsed ( statementSql ) ,
152
- } of execArgsArr ) {
153
- try {
154
- this . sqliteDbWrapper . cachedExecute ( statementSql , bindValues , { otelContext, writeTables } )
155
- } catch ( cause ) {
156
- throw UnexpectedError . make ( {
157
- cause,
158
- note : `Error executing materializer for event "${ eventDecoded . name } ".\nStatement: ${ statementSql } \nBind values: ${ JSON . stringify ( bindValues ) } ` ,
159
- } )
146
+ // TODO: we should probably handle this more gracefully using Effect’s error channel
160
147
}
161
148
162
- // durationMsTotal += durationMs
163
- for ( const table of writeTables ) {
164
- writeTablesForEvent . add ( table )
165
- }
166
- }
167
- }
168
-
169
- let sessionChangeset :
170
- | { _tag : 'sessionChangeset' ; data : Uint8Array ; debug : any }
171
- | { _tag : 'no-op' }
172
- | { _tag : 'unset' } = { _tag : 'unset' }
149
+ return yield * Effect . sync ( ( ) => {
150
+ const writeTablesForEvent = new Set < string > ( )
151
+
152
+ const exec = ( ) => {
153
+ for ( const {
154
+ statementSql,
155
+ bindValues,
156
+ writeTables = this . sqliteDbWrapper . getTablesUsed ( statementSql ) ,
157
+ } of execArgsArr ) {
158
+ this . sqliteDbWrapper . cachedExecute ( statementSql , bindValues , { otelContext, writeTables } )
159
+
160
+ // durationMsTotal += durationMs
161
+ for ( const table of writeTables ) {
162
+ writeTablesForEvent . add ( table )
163
+ }
164
+ }
165
+ }
173
166
174
- if ( withChangeset === true ) {
175
- sessionChangeset = this . sqliteDbWrapper . withChangeset ( exec ) . changeset
176
- } else {
177
- exec ( )
178
- }
167
+ let sessionChangeset :
168
+ | { _tag : 'sessionChangeset' ; data : Uint8Array ; debug : any }
169
+ | { _tag : 'no-op' }
170
+ | { _tag : 'unset' } = { _tag : 'unset' }
171
+ if ( withChangeset === true ) {
172
+ sessionChangeset = this . sqliteDbWrapper . withChangeset ( exec ) . changeset
173
+ } else {
174
+ exec ( )
175
+ }
179
176
180
- return { writeTables : writeTablesForEvent , sessionChangeset, materializerHash }
181
- } ,
177
+ return { writeTables : writeTablesForEvent , sessionChangeset, materializerHash }
178
+ } )
179
+ } ) ,
180
+ ) ,
182
181
rollback : ( changeset ) => {
183
182
this . sqliteDbWrapper . rollback ( changeset )
184
183
} ,
@@ -584,14 +583,14 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema, TContext =
584
583
const otelContext = otel . trace . setSpan ( otel . context . active ( ) , span )
585
584
586
585
try {
587
- // Materialize events to state
588
586
const { writeTables } = ( ( ) => {
589
587
try {
590
588
const materializeEvents = ( ) => {
591
589
return Runtime . runSync ( this . effectContext . runtime , this . syncProcessor . push ( events , { otelContext } ) )
592
590
}
593
591
594
592
if ( events . length > 1 ) {
593
+ // TODO: what to do about leader transaction here?
595
594
return this . sqliteDbWrapper . txn ( materializeEvents )
596
595
} else {
597
596
return materializeEvents ( )
0 commit comments