@@ -4,11 +4,10 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
4
4
import { BucketQueue , Effect , FiberHandle , Queue , Schema , Stream , Subscribable } from '@livestore/utils/effect'
5
5
import * as otel from '@opentelemetry/api'
6
6
7
- import type { ClientSession , UnexpectedError } from '../adapter-types.js'
7
+ import { type ClientSession , SyncError , type UnexpectedError } from '../adapter-types.js'
8
8
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
9
9
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
10
- import { getEventDef , type LiveStoreSchema , SystemTables } from '../schema/mod.js'
11
- import { sql } from '../util.js'
10
+ import { getEventDef , type LiveStoreSchema } from '../schema/mod.js'
12
11
import * as SyncState from './syncstate.js'
13
12
14
13
/**
@@ -21,6 +20,10 @@ import * as SyncState from './syncstate.js'
21
20
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
22
21
*
23
22
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
23
+ *
24
+ * The session and leader sync processor are different in the following ways:
25
+ * - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
26
+ * - The session sync processor has no downstream nodes.
24
27
*/
25
28
export const makeClientSessionSyncProcessor = ( {
26
29
schema,
@@ -37,7 +40,7 @@ export const makeClientSessionSyncProcessor = ({
37
40
clientSession : ClientSession
38
41
runtime : Runtime . Runtime < Scope . Scope >
39
42
materializeEvent : (
40
- eventDecoded : LiveStoreEvent . PartialAnyDecoded ,
43
+ eventDecoded : LiveStoreEvent . AnyDecoded ,
41
44
options : { otelContext : otel . Context ; withChangeset : boolean ; materializerHashLeader : Option . Option < number > } ,
42
45
) => {
43
46
writeTables : Set < string >
@@ -85,7 +88,10 @@ export const makeClientSessionSyncProcessor = ({
85
88
let baseEventSequenceNumber = syncStateRef . current . localHead
86
89
const encodedEventDefs = batch . map ( ( { name, args } ) => {
87
90
const eventDef = getEventDef ( schema , name )
88
- const nextNumPair = EventSequenceNumber . nextPair ( baseEventSequenceNumber , eventDef . eventDef . options . clientOnly )
91
+ const nextNumPair = EventSequenceNumber . nextPair ( {
92
+ seqNum : baseEventSequenceNumber ,
93
+ isClient : eventDef . eventDef . options . clientOnly
94
+ } )
89
95
baseEventSequenceNumber = nextNumPair . seqNum
90
96
return new LiveStoreEvent . EncodedWithMeta (
91
97
Schema . encodeUnknownSync ( eventSchema ) ( {
@@ -109,7 +115,7 @@ export const makeClientSessionSyncProcessor = ({
109
115
)
110
116
111
117
if ( mergeResult . _tag === 'unexpected-error' ) {
112
- return yield * Effect . die ( new Error ( `Unexpected error in client-session-sync-processor: ${ mergeResult . cause } ` ) )
118
+ return yield * Effect . die ( new Error ( `Unexpected error in client-session-sync-processor: ${ mergeResult . message } ` ) )
113
119
}
114
120
115
121
if ( TRACE_VERBOSE ) yield * Effect . annotateCurrentSpan ( { mergeResult : JSON . stringify ( mergeResult ) } )
@@ -189,18 +195,11 @@ export const makeClientSessionSyncProcessor = ({
189
195
190
196
yield * FiberHandle . run ( leaderPushingFiberHandle , backgroundLeaderPushing )
191
197
192
- const getMergeCounter = ( ) =>
193
- clientSession . sqliteDb . select < { mergeCounter : number } > (
194
- sql `SELECT mergeCounter FROM ${ SystemTables . LEADER_MERGE_COUNTER_TABLE } WHERE id = 0` ,
195
- ) [ 0 ] ?. mergeCounter ?? 0
196
-
197
198
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
198
199
yield * Stream . suspend ( ( ) =>
199
- clientSession . leaderThread . events . pull ( {
200
- cursor : { mergeCounter : getMergeCounter ( ) , eventNum : syncStateRef . current . localHead } ,
201
- } ) ,
200
+ clientSession . leaderThread . events . pull ( { cursor : syncStateRef . current . upstreamHead } ) ,
202
201
) . pipe (
203
- Stream . tap ( ( { payload, mergeCounter : leaderMergeCounter } ) =>
202
+ Stream . tap ( ( { payload } ) =>
204
203
Effect . gen ( function * ( ) {
205
204
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
206
205
@@ -216,7 +215,7 @@ export const makeClientSessionSyncProcessor = ({
216
215
} )
217
216
218
217
if ( mergeResult . _tag === 'unexpected-error' ) {
219
- return yield * Effect . fail ( mergeResult . cause )
218
+ return yield * new SyncError ( { cause : mergeResult . message } )
220
219
} else if ( mergeResult . _tag === 'reject' ) {
221
220
return shouldNeverHappen ( 'Unexpected reject in client-session-sync-processor' , mergeResult )
222
221
}
@@ -231,7 +230,7 @@ export const makeClientSessionSyncProcessor = ({
231
230
newEventsCount : mergeResult . newEvents . length ,
232
231
rollbackCount : mergeResult . rollbackEvents . length ,
233
232
res : TRACE_VERBOSE ? JSON . stringify ( mergeResult ) : undefined ,
234
- leaderMergeCounter ,
233
+ rebaseGeneration : mergeResult . newSyncState . localHead . rebaseGeneration ,
235
234
} )
236
235
237
236
debugInfo . rebaseCount ++
@@ -248,7 +247,6 @@ export const makeClientSessionSyncProcessor = ({
248
247
'merge:pull:rebase: rollback' ,
249
248
mergeResult . rollbackEvents . length ,
250
249
...mergeResult . rollbackEvents . slice ( 0 , 10 ) . map ( ( _ ) => _ . toJSON ( ) ) ,
251
- { leaderMergeCounter } ,
252
250
) . pipe ( Effect . provide ( runtime ) , Effect . runSync )
253
251
}
254
252
@@ -268,7 +266,6 @@ export const makeClientSessionSyncProcessor = ({
268
266
payload : TRACE_VERBOSE ? JSON . stringify ( payload ) : undefined ,
269
267
newEventsCount : mergeResult . newEvents . length ,
270
268
res : TRACE_VERBOSE ? JSON . stringify ( mergeResult ) : undefined ,
271
- leaderMergeCounter,
272
269
} )
273
270
274
271
debugInfo . advanceCount ++
0 commit comments