Skip to content

Commit e42880f

Browse files
committed
chore: sync changes
1 parent f22c8bb commit e42880f

File tree

2 files changed

+20
-20
lines changed

2 files changed

+20
-20
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
44
import { BucketQueue, Effect, FiberHandle, Queue, Schema, Stream, Subscribable } from '@livestore/utils/effect'
55
import type * as otel from '@opentelemetry/api'
66

7-
import type { ClientSession, UnexpectedError } from '../adapter-types.js'
7+
import { type ClientSession, SyncError, type UnexpectedError } from '../adapter-types.js'
88
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
99
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
10-
import { getEventDef, type LiveStoreSchema, SystemTables } from '../schema/mod.js'
11-
import { sql } from '../util.js'
10+
import { getEventDef, type LiveStoreSchema } from '../schema/mod.js'
1211
import * as SyncState from './syncstate.js'
1312

1413
/**
@@ -21,6 +20,10 @@ import * as SyncState from './syncstate.js'
2120
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
2221
*
2322
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
23+
*
24+
* The session and leader sync processor are different in the following ways:
25+
* - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
26+
* - The session sync processor has no downstream nodes.
2427
*/
2528
export const makeClientSessionSyncProcessor = ({
2629
schema,
@@ -37,7 +40,7 @@ export const makeClientSessionSyncProcessor = ({
3740
clientSession: ClientSession
3841
runtime: Runtime.Runtime<Scope.Scope>
3942
materializeEvent: (
40-
eventDecoded: LiveStoreEvent.PartialAnyDecoded,
43+
eventDecoded: LiveStoreEvent.AnyDecoded,
4144
options: { withChangeset: boolean; materializerHashLeader: Option.Option<number> },
4245
) => Effect.Effect<{
4346
writeTables: Set<string>
@@ -82,7 +85,10 @@ export const makeClientSessionSyncProcessor = ({
8285
let baseEventSequenceNumber = syncStateRef.current.localHead
8386
const encodedEventDefs = batch.map(({ name, args }) => {
8487
const eventDef = getEventDef(schema, name)
85-
const nextNumPair = EventSequenceNumber.nextPair(baseEventSequenceNumber, eventDef.eventDef.options.clientOnly)
88+
const nextNumPair = EventSequenceNumber.nextPair({
89+
seqNum: baseEventSequenceNumber,
90+
isClient: eventDef.eventDef.options.clientOnly,
91+
})
8692
baseEventSequenceNumber = nextNumPair.seqNum
8793
return new LiveStoreEvent.EncodedWithMeta(
8894
Schema.encodeUnknownSync(eventSchema)({
@@ -106,7 +112,7 @@ export const makeClientSessionSyncProcessor = ({
106112
)
107113

108114
if (mergeResult._tag === 'unexpected-error') {
109-
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.cause}`))
115+
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.message}`))
110116
}
111117

112118
if (TRACE_VERBOSE) yield* Effect.annotateCurrentSpan({ mergeResult: JSON.stringify(mergeResult) })
@@ -183,18 +189,11 @@ export const makeClientSessionSyncProcessor = ({
183189

184190
yield* FiberHandle.run(leaderPushingFiberHandle, backgroundLeaderPushing)
185191

186-
const getMergeCounter = () =>
187-
clientSession.sqliteDb.select<{ mergeCounter: number }>(
188-
sql`SELECT mergeCounter FROM ${SystemTables.LEADER_MERGE_COUNTER_TABLE} WHERE id = 0`,
189-
)[0]?.mergeCounter ?? 0
190-
191192
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
192193
yield* Stream.suspend(() =>
193-
clientSession.leaderThread.events.pull({
194-
cursor: { mergeCounter: getMergeCounter(), eventNum: syncStateRef.current.localHead },
195-
}),
194+
clientSession.leaderThread.events.pull({ cursor: syncStateRef.current.upstreamHead }),
196195
).pipe(
197-
Stream.tap(({ payload, mergeCounter: leaderMergeCounter }) =>
196+
Stream.tap(({ payload }) =>
198197
Effect.gen(function* () {
199198
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
200199

@@ -210,13 +209,13 @@ export const makeClientSessionSyncProcessor = ({
210209
})
211210

212211
if (mergeResult._tag === 'unexpected-error') {
213-
return yield* Effect.fail(mergeResult.cause)
212+
return yield* new SyncError({ cause: mergeResult.message })
214213
} else if (mergeResult._tag === 'reject') {
215214
return shouldNeverHappen('Unexpected reject in client-session-sync-processor', mergeResult)
216215
}
217216

218217
syncStateRef.current = mergeResult.newSyncState
219-
syncStateUpdateQueue.offer(mergeResult.newSyncState).pipe(Effect.runSync)
218+
yield* syncStateUpdateQueue.offer(mergeResult.newSyncState)
220219

221220
if (mergeResult._tag === 'rebase') {
222221
span.addEvent('merge:pull:rebase', {
@@ -225,7 +224,7 @@ export const makeClientSessionSyncProcessor = ({
225224
newEventsCount: mergeResult.newEvents.length,
226225
rollbackCount: mergeResult.rollbackEvents.length,
227226
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
228-
leaderMergeCounter,
227+
rebaseGeneration: mergeResult.newSyncState.localHead.rebaseGeneration,
229228
})
230229

231230
debugInfo.rebaseCount++
@@ -242,7 +241,6 @@ export const makeClientSessionSyncProcessor = ({
242241
'merge:pull:rebase: rollback',
243242
mergeResult.rollbackEvents.length,
244243
...mergeResult.rollbackEvents.slice(0, 10).map((_) => _.toJSON()),
245-
{ leaderMergeCounter },
246244
).pipe(Effect.provide(runtime), Effect.runSync)
247245
}
248246

@@ -262,7 +260,6 @@ export const makeClientSessionSyncProcessor = ({
262260
payload: TRACE_VERBOSE ? JSON.stringify(payload) : undefined,
263261
newEventsCount: mergeResult.newEvents.length,
264262
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
265-
leaderMergeCounter,
266263
})
267264

268265
debugInfo.advanceCount++

packages/@livestore/livestore/src/store/store.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,8 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema, TContext =
164164
for (const table of writeTables) {
165165
writeTablesForEvent.add(table)
166166
}
167+
168+
this.sqliteDbWrapper.debug.head = eventDecoded.seqNum
167169
}
168170
}
169171

@@ -587,6 +589,7 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema, TContext =
587589
const otelContext = otel.trace.setSpan(otel.context.active(), span)
588590

589591
try {
592+
// Materialize events to state
590593
const { writeTables } = (() => {
591594
try {
592595
const materializeEvents = () => {

0 commit comments

Comments
 (0)