Skip to content

Commit 4dbb186

Browse files
committed
chore: sync changes
1 parent f22e87b commit 4dbb186

File tree

1 file changed

+16
-19
lines changed

1 file changed

+16
-19
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
44
import { BucketQueue, Effect, FiberHandle, Queue, Schema, Stream, Subscribable } from '@livestore/utils/effect'
55
import * as otel from '@opentelemetry/api'
66

7-
import type { ClientSession, UnexpectedError } from '../adapter-types.js'
7+
import { type ClientSession, SyncError, type UnexpectedError } from '../adapter-types.js'
88
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
99
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
10-
import { getEventDef, type LiveStoreSchema, SystemTables } from '../schema/mod.js'
11-
import { sql } from '../util.js'
10+
import { getEventDef, type LiveStoreSchema } from '../schema/mod.js'
1211
import * as SyncState from './syncstate.js'
1312

1413
/**
@@ -21,6 +20,10 @@ import * as SyncState from './syncstate.js'
2120
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
2221
*
2322
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
23+
*
24+
* The session and leader sync processor are different in the following ways:
25+
* - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
26+
* - The session sync processor has no downstream nodes.
2427
*/
2528
export const makeClientSessionSyncProcessor = ({
2629
schema,
@@ -37,7 +40,7 @@ export const makeClientSessionSyncProcessor = ({
3740
clientSession: ClientSession
3841
runtime: Runtime.Runtime<Scope.Scope>
3942
materializeEvent: (
40-
eventDecoded: LiveStoreEvent.PartialAnyDecoded,
43+
eventDecoded: LiveStoreEvent.AnyDecoded,
4144
options: { otelContext: otel.Context; withChangeset: boolean; materializerHashLeader: Option.Option<number> },
4245
) => {
4346
writeTables: Set<string>
@@ -85,7 +88,10 @@ export const makeClientSessionSyncProcessor = ({
8588
let baseEventSequenceNumber = syncStateRef.current.localHead
8689
const encodedEventDefs = batch.map(({ name, args }) => {
8790
const eventDef = getEventDef(schema, name)
88-
const nextNumPair = EventSequenceNumber.nextPair(baseEventSequenceNumber, eventDef.eventDef.options.clientOnly)
91+
const nextNumPair = EventSequenceNumber.nextPair({
92+
seqNum: baseEventSequenceNumber,
93+
isClient: eventDef.eventDef.options.clientOnly
94+
})
8995
baseEventSequenceNumber = nextNumPair.seqNum
9096
return new LiveStoreEvent.EncodedWithMeta(
9197
Schema.encodeUnknownSync(eventSchema)({
@@ -109,7 +115,7 @@ export const makeClientSessionSyncProcessor = ({
109115
)
110116

111117
if (mergeResult._tag === 'unexpected-error') {
112-
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.cause}`))
118+
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.message}`))
113119
}
114120

115121
if (TRACE_VERBOSE) yield* Effect.annotateCurrentSpan({ mergeResult: JSON.stringify(mergeResult) })
@@ -189,18 +195,11 @@ export const makeClientSessionSyncProcessor = ({
189195

190196
yield* FiberHandle.run(leaderPushingFiberHandle, backgroundLeaderPushing)
191197

192-
const getMergeCounter = () =>
193-
clientSession.sqliteDb.select<{ mergeCounter: number }>(
194-
sql`SELECT mergeCounter FROM ${SystemTables.LEADER_MERGE_COUNTER_TABLE} WHERE id = 0`,
195-
)[0]?.mergeCounter ?? 0
196-
197198
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
198199
yield* Stream.suspend(() =>
199-
clientSession.leaderThread.events.pull({
200-
cursor: { mergeCounter: getMergeCounter(), eventNum: syncStateRef.current.localHead },
201-
}),
200+
clientSession.leaderThread.events.pull({ cursor: syncStateRef.current.upstreamHead }),
202201
).pipe(
203-
Stream.tap(({ payload, mergeCounter: leaderMergeCounter }) =>
202+
Stream.tap(({ payload }) =>
204203
Effect.gen(function* () {
205204
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
206205

@@ -216,7 +215,7 @@ export const makeClientSessionSyncProcessor = ({
216215
})
217216

218217
if (mergeResult._tag === 'unexpected-error') {
219-
return yield* Effect.fail(mergeResult.cause)
218+
return yield* new SyncError({ cause: mergeResult.message })
220219
} else if (mergeResult._tag === 'reject') {
221220
return shouldNeverHappen('Unexpected reject in client-session-sync-processor', mergeResult)
222221
}
@@ -231,7 +230,7 @@ export const makeClientSessionSyncProcessor = ({
231230
newEventsCount: mergeResult.newEvents.length,
232231
rollbackCount: mergeResult.rollbackEvents.length,
233232
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
234-
leaderMergeCounter,
233+
rebaseGeneration: mergeResult.newSyncState.localHead.rebaseGeneration,
235234
})
236235

237236
debugInfo.rebaseCount++
@@ -248,7 +247,6 @@ export const makeClientSessionSyncProcessor = ({
248247
'merge:pull:rebase: rollback',
249248
mergeResult.rollbackEvents.length,
250249
...mergeResult.rollbackEvents.slice(0, 10).map((_) => _.toJSON()),
251-
{ leaderMergeCounter },
252250
).pipe(Effect.provide(runtime), Effect.runSync)
253251
}
254252

@@ -268,7 +266,6 @@ export const makeClientSessionSyncProcessor = ({
268266
payload: TRACE_VERBOSE ? JSON.stringify(payload) : undefined,
269267
newEventsCount: mergeResult.newEvents.length,
270268
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
271-
leaderMergeCounter,
272269
})
273270

274271
debugInfo.advanceCount++

0 commit comments

Comments
 (0)