@@ -100,7 +100,7 @@ export const makeClientSessionSyncProcessor = ({
100
100
const eventDef = getEventDef ( schema , name )
101
101
const nextNumPair = EventSequenceNumber . nextPair ( {
102
102
seqNum : baseEventSequenceNumber ,
103
- isClient : eventDef . eventDef . options . clientOnly
103
+ isClient : eventDef . eventDef . options . clientOnly ,
104
104
} )
105
105
baseEventSequenceNumber = nextNumPair . seqNum
106
106
return new LiveStoreEvent . EncodedWithMeta (
@@ -125,13 +125,13 @@ export const makeClientSessionSyncProcessor = ({
125
125
)
126
126
127
127
if ( mergeResult . _tag === 'unexpected-error' ) {
128
- return yield * Effect . die ( new Error ( `Unexpected error in client-session-sync-processor: ${ mergeResult . message } ` ) )
128
+ return yield * new SyncError ( { cause : mergeResult . message } )
129
129
}
130
130
131
131
if ( TRACE_VERBOSE ) yield * Effect . annotateCurrentSpan ( { mergeResult : JSON . stringify ( mergeResult ) } )
132
132
133
133
if ( mergeResult . _tag !== 'advance' ) {
134
- return yield * Effect . die ( new Error ( `Expected advance, got ${ mergeResult . _tag } ` ) )
134
+ return yield * new SyncError ( { cause : `Expected advance, got ${ mergeResult . _tag } ` } )
135
135
}
136
136
137
137
syncStateRef . current = mergeResult . newSyncState
@@ -206,7 +206,7 @@ export const makeClientSessionSyncProcessor = ({
206
206
207
207
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
208
208
yield * Stream . suspend ( ( ) =>
209
- clientSession . leaderThread . events . pull ( { cursor : syncStateRef . current . upstreamHead } ) ,
209
+ clientSession . leaderThread . events . pull ( { cursor : syncStateRef . current . upstreamHead } ) ,
210
210
) . pipe (
211
211
Stream . tap ( ( { payload } ) =>
212
212
Effect . gen ( function * ( ) {
@@ -230,7 +230,7 @@ export const makeClientSessionSyncProcessor = ({
230
230
}
231
231
232
232
syncStateRef . current = mergeResult . newSyncState
233
- syncStateUpdateQueue . offer ( mergeResult . newSyncState ) . pipe ( Effect . runSync )
233
+ yield * syncStateUpdateQueue . offer ( mergeResult . newSyncState )
234
234
235
235
if ( mergeResult . _tag === 'rebase' ) {
236
236
span . addEvent ( 'merge:pull:rebase' , {
@@ -355,7 +355,7 @@ export interface ClientSessionSyncProcessor {
355
355
{
356
356
writeTables : Set < string >
357
357
} ,
358
- never
358
+ SyncError
359
359
>
360
360
boot : Effect . Effect < void , UnexpectedError , Scope . Scope >
361
361
/**
0 commit comments