@@ -76,7 +76,10 @@ export const makeClientSessionSyncProcessor = ({
76
76
/** We're queuing push requests to reduce the number of messages sent to the leader by batching them */
77
77
const leaderPushQueue = BucketQueue . make < LiveStoreEvent . EncodedWithMeta > ( ) . pipe ( Effect . runSync )
78
78
79
- const push : ClientSessionSyncProcessor [ 'push' ] = ( batch , { otelContext } ) => {
79
+ const push : ClientSessionSyncProcessor [ 'push' ] = Effect . fn ( 'client-session-sync-processor:push' ) ( function * (
80
+ batch ,
81
+ { otelContext } ,
82
+ ) {
80
83
// TODO validate batch
81
84
82
85
let baseEventSequenceNumber = syncStateRef . current . localHead
@@ -94,29 +97,29 @@ export const makeClientSessionSyncProcessor = ({
94
97
} ) ,
95
98
)
96
99
} )
97
-
98
- const mergeResult = SyncState . merge ( {
99
- syncState : syncStateRef . current ,
100
- payload : { _tag : 'local-push' , newEvents : encodedEventDefs } ,
101
- isClientEvent,
102
- isEqualEvent : LiveStoreEvent . isEqualEncoded ,
103
- } )
100
+ yield * Effect . annotateCurrentSpan ( { batchSize : encodedEventDefs . length } )
101
+
102
+ const mergeResult = yield * Effect . sync ( ( ) =>
103
+ SyncState . merge ( {
104
+ syncState : syncStateRef . current ,
105
+ payload : { _tag : 'local-push' , newEvents : encodedEventDefs } ,
106
+ isClientEvent,
107
+ isEqualEvent : LiveStoreEvent . isEqualEncoded ,
108
+ } ) ,
109
+ )
104
110
105
111
if ( mergeResult . _tag === 'unexpected-error' ) {
106
- return shouldNeverHappen ( ' Unexpected error in client-session-sync-processor' , mergeResult . cause )
112
+ return yield * Effect . die ( new Error ( ` Unexpected error in client-session-sync-processor: ${ mergeResult . cause } ` ) )
107
113
}
108
114
109
- span . addEvent ( 'local-push' , {
110
- batchSize : encodedEventDefs . length ,
111
- mergeResult : TRACE_VERBOSE ? JSON . stringify ( mergeResult ) : undefined ,
112
- } )
115
+ if ( TRACE_VERBOSE ) yield * Effect . annotateCurrentSpan ( { mergeResult : JSON . stringify ( mergeResult ) } )
113
116
114
117
if ( mergeResult . _tag !== 'advance' ) {
115
- return shouldNeverHappen ( `Expected advance, got ${ mergeResult . _tag } ` )
118
+ return yield * Effect . die ( new Error ( `Expected advance, got ${ mergeResult . _tag } ` ) )
116
119
}
117
120
118
121
syncStateRef . current = mergeResult . newSyncState
119
- syncStateUpdateQueue . offer ( mergeResult . newSyncState ) . pipe ( Effect . runSync )
122
+ yield * syncStateUpdateQueue . offer ( mergeResult . newSyncState )
120
123
121
124
// Materialize events to state
122
125
const writeTables = new Set < string > ( )
@@ -135,16 +138,18 @@ export const makeClientSessionSyncProcessor = ({
135
138
for ( const table of newWriteTables ) {
136
139
writeTables . add ( table )
137
140
}
138
- event . meta . sessionChangeset = sessionChangeset
139
- event . meta . materializerHashSession = materializerHash
141
+ yield * Effect . sync ( ( ) => {
142
+ event . meta . sessionChangeset = sessionChangeset
143
+ event . meta . materializerHashSession = materializerHash
144
+ } )
140
145
}
141
146
142
147
// Trigger push to leader
143
148
// console.debug('pushToLeader', encodedEventDefs.length, ...encodedEventDefs.map((_) => _.toJSON()))
144
- BucketQueue . offerAll ( leaderPushQueue , encodedEventDefs ) . pipe ( Effect . runSync )
149
+ yield * BucketQueue . offerAll ( leaderPushQueue , encodedEventDefs )
145
150
146
151
return { writeTables }
147
- }
152
+ } )
148
153
149
154
const debugInfo = {
150
155
rebaseCount : 0 ,
@@ -340,9 +345,12 @@ export interface ClientSessionSyncProcessor {
340
345
push : (
341
346
batch : ReadonlyArray < LiveStoreEvent . PartialAnyDecoded > ,
342
347
options : { otelContext : otel . Context } ,
343
- ) => {
344
- writeTables : Set < string >
345
- }
348
+ ) => Effect . Effect <
349
+ {
350
+ writeTables : Set < string >
351
+ } ,
352
+ never
353
+ >
346
354
boot : Effect . Effect < void , UnexpectedError , Scope . Scope >
347
355
/**
348
356
* Only used for debugging / observability.
0 commit comments