Skip to content

Commit 0eddddb

Browse files
committed
chore: sync changes
1 parent aa62a8f commit 0eddddb

File tree

2 files changed

+17
-18
lines changed

2 files changed

+17
-18
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
44
import { BucketQueue, Effect, FiberHandle, Queue, Schema, Stream, Subscribable } from '@livestore/utils/effect'
55
import type * as otel from '@opentelemetry/api'
66

7-
import type { ClientSession, UnexpectedError } from '../adapter-types.js'
7+
import { type ClientSession, SyncError, type UnexpectedError } from '../adapter-types.js'
88
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
99
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
1010
import { getEventDef, type LiveStoreSchema, SystemTables } from '../schema/mod.js'
@@ -21,6 +21,10 @@ import * as SyncState from './syncstate.js'
2121
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
2222
*
2323
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
24+
*
25+
* The session and leader sync processor are different in the following ways:
26+
* - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
27+
* - The session sync processor has no downstream nodes.
2428
*/
2529
export const makeClientSessionSyncProcessor = ({
2630
schema,
@@ -37,7 +41,7 @@ export const makeClientSessionSyncProcessor = ({
3741
clientSession: ClientSession
3842
runtime: Runtime.Runtime<Scope.Scope>
3943
materializeEvent: (
40-
eventDecoded: LiveStoreEvent.PartialAnyDecoded,
44+
eventDecoded: LiveStoreEvent.AnyDecoded,
4145
options: { withChangeset: boolean; materializerHashLeader: Option.Option<number> },
4246
) => Effect.Effect<{
4347
writeTables: Set<string>
@@ -82,7 +86,10 @@ export const makeClientSessionSyncProcessor = ({
8286
let baseEventSequenceNumber = syncStateRef.current.localHead
8387
const encodedEventDefs = batch.map(({ name, args }) => {
8488
const eventDef = getEventDef(schema, name)
85-
const nextNumPair = EventSequenceNumber.nextPair(baseEventSequenceNumber, eventDef.eventDef.options.clientOnly)
89+
const nextNumPair = EventSequenceNumber.nextPair({
90+
seqNum: baseEventSequenceNumber,
91+
isClient: eventDef.eventDef.options.clientOnly,
92+
})
8693
baseEventSequenceNumber = nextNumPair.seqNum
8794
return new LiveStoreEvent.EncodedWithMeta(
8895
Schema.encodeUnknownSync(eventSchema)({
@@ -106,7 +113,7 @@ export const makeClientSessionSyncProcessor = ({
106113
)
107114

108115
if (mergeResult._tag === 'unexpected-error') {
109-
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.cause}`))
116+
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.message}`))
110117
}
111118

112119
if (TRACE_VERBOSE) yield* Effect.annotateCurrentSpan({ mergeResult: JSON.stringify(mergeResult) })
@@ -183,18 +190,11 @@ export const makeClientSessionSyncProcessor = ({
183190

184191
yield* FiberHandle.run(leaderPushingFiberHandle, backgroundLeaderPushing)
185192

186-
const getMergeCounter = () =>
187-
clientSession.sqliteDb.select<{ mergeCounter: number }>(
188-
sql`SELECT mergeCounter FROM ${SystemTables.LEADER_MERGE_COUNTER_TABLE} WHERE id = 0`,
189-
)[0]?.mergeCounter ?? 0
190-
191193
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
192194
yield* Stream.suspend(() =>
193-
clientSession.leaderThread.events.pull({
194-
cursor: { mergeCounter: getMergeCounter(), eventNum: syncStateRef.current.localHead },
195-
}),
195+
clientSession.leaderThread.events.pull({ cursor: syncStateRef.current.upstreamHead }),
196196
).pipe(
197-
Stream.tap(({ payload, mergeCounter: leaderMergeCounter }) =>
197+
Stream.tap(({ payload }) =>
198198
Effect.gen(function* () {
199199
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
200200

@@ -210,13 +210,13 @@ export const makeClientSessionSyncProcessor = ({
210210
})
211211

212212
if (mergeResult._tag === 'unexpected-error') {
213-
return yield* Effect.fail(mergeResult.cause)
213+
return yield* new SyncError({ cause: mergeResult.message })
214214
} else if (mergeResult._tag === 'reject') {
215215
return shouldNeverHappen('Unexpected reject in client-session-sync-processor', mergeResult)
216216
}
217217

218218
syncStateRef.current = mergeResult.newSyncState
219-
syncStateUpdateQueue.offer(mergeResult.newSyncState).pipe(Effect.runSync)
219+
yield* syncStateUpdateQueue.offer(mergeResult.newSyncState)
220220

221221
if (mergeResult._tag === 'rebase') {
222222
span.addEvent('merge:pull:rebase', {
@@ -225,7 +225,7 @@ export const makeClientSessionSyncProcessor = ({
225225
newEventsCount: mergeResult.newEvents.length,
226226
rollbackCount: mergeResult.rollbackEvents.length,
227227
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
228-
leaderMergeCounter,
228+
rebaseGeneration: mergeResult.newSyncState.localHead.rebaseGeneration,
229229
})
230230

231231
debugInfo.rebaseCount++
@@ -242,7 +242,6 @@ export const makeClientSessionSyncProcessor = ({
242242
'merge:pull:rebase: rollback',
243243
mergeResult.rollbackEvents.length,
244244
...mergeResult.rollbackEvents.slice(0, 10).map((_) => _.toJSON()),
245-
{ leaderMergeCounter },
246245
).pipe(Effect.provide(runtime), Effect.runSync)
247246
}
248247

@@ -262,7 +261,6 @@ export const makeClientSessionSyncProcessor = ({
262261
payload: TRACE_VERBOSE ? JSON.stringify(payload) : undefined,
263262
newEventsCount: mergeResult.newEvents.length,
264263
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
265-
leaderMergeCounter,
266264
})
267265

268266
debugInfo.advanceCount++

packages/@livestore/livestore/src/store/store.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema, TContext =
587587
const otelContext = otel.trace.setSpan(otel.context.active(), span)
588588

589589
try {
590+
// Materialize events to state
590591
const { writeTables } = (() => {
591592
try {
592593
const materializeEvents = () => {

0 commit comments

Comments
 (0)