@@ -4,11 +4,10 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
4
4
import { BucketQueue , Effect , FiberHandle , Queue , Schema , Stream , Subscribable } from '@livestore/utils/effect'
5
5
import type * as otel from '@opentelemetry/api'
6
6
7
- import type { ClientSession , UnexpectedError } from '../adapter-types.js'
7
+ import { type ClientSession , SyncError , type UnexpectedError } from '../adapter-types.js'
8
8
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
9
9
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
10
- import { getEventDef , type LiveStoreSchema , SystemTables } from '../schema/mod.js'
11
- import { sql } from '../util.js'
10
+ import { getEventDef , type LiveStoreSchema } from '../schema/mod.js'
12
11
import * as SyncState from './syncstate.js'
13
12
14
13
/**
@@ -21,6 +20,10 @@ import * as SyncState from './syncstate.js'
21
20
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
22
21
*
23
22
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
23
+ *
24
+ * The session and leader sync processor are different in the following ways:
25
+ * - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
26
+ * - The session sync processor has no downstream nodes.
24
27
*/
25
28
export const makeClientSessionSyncProcessor = ( {
26
29
schema,
@@ -37,7 +40,7 @@ export const makeClientSessionSyncProcessor = ({
37
40
clientSession : ClientSession
38
41
runtime : Runtime . Runtime < Scope . Scope >
39
42
materializeEvent : (
40
- eventDecoded : LiveStoreEvent . PartialAnyDecoded ,
43
+ eventDecoded : LiveStoreEvent . AnyDecoded ,
41
44
options : { withChangeset : boolean ; materializerHashLeader : Option . Option < number > } ,
42
45
) => Effect . Effect < {
43
46
writeTables : Set < string >
@@ -82,7 +85,10 @@ export const makeClientSessionSyncProcessor = ({
82
85
let baseEventSequenceNumber = syncStateRef . current . localHead
83
86
const encodedEventDefs = batch . map ( ( { name, args } ) => {
84
87
const eventDef = getEventDef ( schema , name )
85
- const nextNumPair = EventSequenceNumber . nextPair ( baseEventSequenceNumber , eventDef . eventDef . options . clientOnly )
88
+ const nextNumPair = EventSequenceNumber . nextPair ( {
89
+ seqNum : baseEventSequenceNumber ,
90
+ isClient : eventDef . eventDef . options . clientOnly ,
91
+ } )
86
92
baseEventSequenceNumber = nextNumPair . seqNum
87
93
return new LiveStoreEvent . EncodedWithMeta (
88
94
Schema . encodeUnknownSync ( eventSchema ) ( {
@@ -106,13 +112,13 @@ export const makeClientSessionSyncProcessor = ({
106
112
)
107
113
108
114
if ( mergeResult . _tag === 'unexpected-error' ) {
109
- return yield * Effect . die ( new Error ( `Unexpected error in client-session-sync-processor: ${ mergeResult . cause } ` ) )
115
+ return yield * new SyncError ( { cause : mergeResult . message } )
110
116
}
111
117
112
118
if ( TRACE_VERBOSE ) yield * Effect . annotateCurrentSpan ( { mergeResult : JSON . stringify ( mergeResult ) } )
113
119
114
120
if ( mergeResult . _tag !== 'advance' ) {
115
- return yield * Effect . die ( new Error ( `Expected advance, got ${ mergeResult . _tag } ` ) )
121
+ return yield * new SyncError ( { cause : `Expected advance, got ${ mergeResult . _tag } ` } )
116
122
}
117
123
118
124
syncStateRef . current = mergeResult . newSyncState
@@ -183,18 +189,11 @@ export const makeClientSessionSyncProcessor = ({
183
189
184
190
yield * FiberHandle . run ( leaderPushingFiberHandle , backgroundLeaderPushing )
185
191
186
- const getMergeCounter = ( ) =>
187
- clientSession . sqliteDb . select < { mergeCounter : number } > (
188
- sql `SELECT mergeCounter FROM ${ SystemTables . LEADER_MERGE_COUNTER_TABLE } WHERE id = 0` ,
189
- ) [ 0 ] ?. mergeCounter ?? 0
190
-
191
192
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
192
193
yield * Stream . suspend ( ( ) =>
193
- clientSession . leaderThread . events . pull ( {
194
- cursor : { mergeCounter : getMergeCounter ( ) , eventNum : syncStateRef . current . localHead } ,
195
- } ) ,
194
+ clientSession . leaderThread . events . pull ( { cursor : syncStateRef . current . upstreamHead } ) ,
196
195
) . pipe (
197
- Stream . tap ( ( { payload, mergeCounter : leaderMergeCounter } ) =>
196
+ Stream . tap ( ( { payload } ) =>
198
197
Effect . gen ( function * ( ) {
199
198
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
200
199
@@ -210,13 +209,13 @@ export const makeClientSessionSyncProcessor = ({
210
209
} )
211
210
212
211
if ( mergeResult . _tag === 'unexpected-error' ) {
213
- return yield * Effect . fail ( mergeResult . cause )
212
+ return yield * new SyncError ( { cause : mergeResult . message } )
214
213
} else if ( mergeResult . _tag === 'reject' ) {
215
214
return shouldNeverHappen ( 'Unexpected reject in client-session-sync-processor' , mergeResult )
216
215
}
217
216
218
217
syncStateRef . current = mergeResult . newSyncState
219
- syncStateUpdateQueue . offer ( mergeResult . newSyncState ) . pipe ( Effect . runSync )
218
+ yield * syncStateUpdateQueue . offer ( mergeResult . newSyncState )
220
219
221
220
if ( mergeResult . _tag === 'rebase' ) {
222
221
span . addEvent ( 'merge:pull:rebase' , {
@@ -225,7 +224,7 @@ export const makeClientSessionSyncProcessor = ({
225
224
newEventsCount : mergeResult . newEvents . length ,
226
225
rollbackCount : mergeResult . rollbackEvents . length ,
227
226
res : TRACE_VERBOSE ? JSON . stringify ( mergeResult ) : undefined ,
228
- leaderMergeCounter ,
227
+ rebaseGeneration : mergeResult . newSyncState . localHead . rebaseGeneration ,
229
228
} )
230
229
231
230
debugInfo . rebaseCount ++
@@ -242,7 +241,6 @@ export const makeClientSessionSyncProcessor = ({
242
241
'merge:pull:rebase: rollback' ,
243
242
mergeResult . rollbackEvents . length ,
244
243
...mergeResult . rollbackEvents . slice ( 0 , 10 ) . map ( ( _ ) => _ . toJSON ( ) ) ,
245
- { leaderMergeCounter } ,
246
244
) . pipe ( Effect . provide ( runtime ) , Effect . runSync )
247
245
}
248
246
@@ -262,7 +260,6 @@ export const makeClientSessionSyncProcessor = ({
262
260
payload : TRACE_VERBOSE ? JSON . stringify ( payload ) : undefined ,
263
261
newEventsCount : mergeResult . newEvents . length ,
264
262
res : TRACE_VERBOSE ? JSON . stringify ( mergeResult ) : undefined ,
265
- leaderMergeCounter,
266
263
} )
267
264
268
265
debugInfo . advanceCount ++
0 commit comments