@@ -90,7 +90,7 @@ export const makeClientSessionSyncProcessor = ({
90
90
const eventDef = getEventDef ( schema , name )
91
91
const nextNumPair = EventSequenceNumber . nextPair ( {
92
92
seqNum : baseEventSequenceNumber ,
93
- isClient : eventDef . eventDef . options . clientOnly
93
+ isClient : eventDef . eventDef . options . clientOnly ,
94
94
} )
95
95
baseEventSequenceNumber = nextNumPair . seqNum
96
96
return new LiveStoreEvent . EncodedWithMeta (
@@ -115,13 +115,13 @@ export const makeClientSessionSyncProcessor = ({
115
115
)
116
116
117
117
if ( mergeResult . _tag === 'unexpected-error' ) {
118
- return yield * Effect . die ( new Error ( `Unexpected error in client-session-sync-processor: ${ mergeResult . message } ` ) )
118
+ return yield * new SyncError ( { cause : mergeResult . message } )
119
119
}
120
120
121
121
if ( TRACE_VERBOSE ) yield * Effect . annotateCurrentSpan ( { mergeResult : JSON . stringify ( mergeResult ) } )
122
122
123
123
if ( mergeResult . _tag !== 'advance' ) {
124
- return yield * Effect . die ( new Error ( `Expected advance, got ${ mergeResult . _tag } ` ) )
124
+ return yield * new SyncError ( { cause : `Expected advance, got ${ mergeResult . _tag } ` } )
125
125
}
126
126
127
127
syncStateRef . current = mergeResult . newSyncState
@@ -197,7 +197,7 @@ export const makeClientSessionSyncProcessor = ({
197
197
198
198
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
199
199
yield * Stream . suspend ( ( ) =>
200
- clientSession . leaderThread . events . pull ( { cursor : syncStateRef . current . upstreamHead } ) ,
200
+ clientSession . leaderThread . events . pull ( { cursor : syncStateRef . current . upstreamHead } ) ,
201
201
) . pipe (
202
202
Stream . tap ( ( { payload } ) =>
203
203
Effect . gen ( function * ( ) {
@@ -221,7 +221,7 @@ export const makeClientSessionSyncProcessor = ({
221
221
}
222
222
223
223
syncStateRef . current = mergeResult . newSyncState
224
- syncStateUpdateQueue . offer ( mergeResult . newSyncState ) . pipe ( Effect . runSync )
224
+ yield * syncStateUpdateQueue . offer ( mergeResult . newSyncState )
225
225
226
226
if ( mergeResult . _tag === 'rebase' ) {
227
227
span . addEvent ( 'merge:pull:rebase' , {
@@ -346,7 +346,7 @@ export interface ClientSessionSyncProcessor {
346
346
{
347
347
writeTables : Set < string >
348
348
} ,
349
- never
349
+ SyncError
350
350
>
351
351
boot : Effect . Effect < void , UnexpectedError , Scope . Scope >
352
352
/**
0 commit comments