Skip to content

Commit 256ec88

Browse files
committed
chore: sync changes
1 parent eb346f9 commit 256ec88

File tree

2 files changed

+32
-31
lines changed

2 files changed

+32
-31
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
44
import { BucketQueue, Effect, FiberHandle, Queue, Schema, Stream, Subscribable } from '@livestore/utils/effect'
55
import type * as otel from '@opentelemetry/api'
66

7-
import type { ClientSession, UnexpectedError } from '../adapter-types.js'
7+
import { type ClientSession, SyncError, type UnexpectedError } from '../adapter-types.js'
88
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
99
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
10-
import { getEventDef, type LiveStoreSchema, SystemTables } from '../schema/mod.js'
11-
import { sql } from '../util.js'
10+
import { getEventDef, type LiveStoreSchema } from '../schema/mod.js'
1211
import * as SyncState from './syncstate.js'
1312

1413
/**
@@ -21,6 +20,10 @@ import * as SyncState from './syncstate.js'
2120
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
2221
*
2322
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
23+
*
24+
* The session and leader sync processor are different in the following ways:
25+
* - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
26+
* - The session sync processor has no downstream nodes.
2427
*/
2528
export const makeClientSessionSyncProcessor = ({
2629
schema,
@@ -37,7 +40,7 @@ export const makeClientSessionSyncProcessor = ({
3740
clientSession: ClientSession
3841
runtime: Runtime.Runtime<Scope.Scope>
3942
materializeEvent: (
40-
eventDecoded: LiveStoreEvent.PartialAnyDecoded,
43+
eventDecoded: LiveStoreEvent.AnyDecoded,
4144
options: { withChangeset: boolean; materializerHashLeader: Option.Option<number> },
4245
) => Effect.Effect<{
4346
writeTables: Set<string>
@@ -82,7 +85,10 @@ export const makeClientSessionSyncProcessor = ({
8285
let baseEventSequenceNumber = syncStateRef.current.localHead
8386
const encodedEventDefs = batch.map(({ name, args }) => {
8487
const eventDef = getEventDef(schema, name)
85-
const nextNumPair = EventSequenceNumber.nextPair(baseEventSequenceNumber, eventDef.eventDef.options.clientOnly)
88+
const nextNumPair = EventSequenceNumber.nextPair({
89+
seqNum: baseEventSequenceNumber,
90+
isClient: eventDef.eventDef.options.clientOnly,
91+
})
8692
baseEventSequenceNumber = nextNumPair.seqNum
8793
return new LiveStoreEvent.EncodedWithMeta(
8894
Schema.encodeUnknownSync(eventSchema)({
@@ -106,13 +112,13 @@ export const makeClientSessionSyncProcessor = ({
106112
)
107113

108114
if (mergeResult._tag === 'unexpected-error') {
109-
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.cause}`))
115+
return yield* new SyncError({ cause: mergeResult.message })
110116
}
111117

112118
if (TRACE_VERBOSE) yield* Effect.annotateCurrentSpan({ mergeResult: JSON.stringify(mergeResult) })
113119

114120
if (mergeResult._tag !== 'advance') {
115-
return yield* Effect.die(new Error(`Expected advance, got ${mergeResult._tag}`))
121+
return yield* new SyncError({ cause: `Expected advance, got ${mergeResult._tag}` })
116122
}
117123

118124
syncStateRef.current = mergeResult.newSyncState
@@ -183,18 +189,11 @@ export const makeClientSessionSyncProcessor = ({
183189

184190
yield* FiberHandle.run(leaderPushingFiberHandle, backgroundLeaderPushing)
185191

186-
const getMergeCounter = () =>
187-
clientSession.sqliteDb.select<{ mergeCounter: number }>(
188-
sql`SELECT mergeCounter FROM ${SystemTables.LEADER_MERGE_COUNTER_TABLE} WHERE id = 0`,
189-
)[0]?.mergeCounter ?? 0
190-
191192
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
192193
yield* Stream.suspend(() =>
193-
clientSession.leaderThread.events.pull({
194-
cursor: { mergeCounter: getMergeCounter(), eventNum: syncStateRef.current.localHead },
195-
}),
194+
clientSession.leaderThread.events.pull({ cursor: syncStateRef.current.upstreamHead }),
196195
).pipe(
197-
Stream.tap(({ payload, mergeCounter: leaderMergeCounter }) =>
196+
Stream.tap(({ payload }) =>
198197
Effect.gen(function* () {
199198
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
200199

@@ -210,13 +209,13 @@ export const makeClientSessionSyncProcessor = ({
210209
})
211210

212211
if (mergeResult._tag === 'unexpected-error') {
213-
return yield* Effect.fail(mergeResult.cause)
212+
return yield* new SyncError({ cause: mergeResult.message })
214213
} else if (mergeResult._tag === 'reject') {
215214
return shouldNeverHappen('Unexpected reject in client-session-sync-processor', mergeResult)
216215
}
217216

218217
syncStateRef.current = mergeResult.newSyncState
219-
syncStateUpdateQueue.offer(mergeResult.newSyncState).pipe(Effect.runSync)
218+
yield* syncStateUpdateQueue.offer(mergeResult.newSyncState)
220219

221220
if (mergeResult._tag === 'rebase') {
222221
span.addEvent('merge:pull:rebase', {
@@ -225,7 +224,7 @@ export const makeClientSessionSyncProcessor = ({
225224
newEventsCount: mergeResult.newEvents.length,
226225
rollbackCount: mergeResult.rollbackEvents.length,
227226
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
228-
leaderMergeCounter,
227+
rebaseGeneration: mergeResult.newSyncState.localHead.rebaseGeneration,
229228
})
230229

231230
debugInfo.rebaseCount++
@@ -242,7 +241,6 @@ export const makeClientSessionSyncProcessor = ({
242241
'merge:pull:rebase: rollback',
243242
mergeResult.rollbackEvents.length,
244243
...mergeResult.rollbackEvents.slice(0, 10).map((_) => _.toJSON()),
245-
{ leaderMergeCounter },
246244
).pipe(Effect.provide(runtime), Effect.runSync)
247245
}
248246

@@ -262,7 +260,6 @@ export const makeClientSessionSyncProcessor = ({
262260
payload: TRACE_VERBOSE ? JSON.stringify(payload) : undefined,
263261
newEventsCount: mergeResult.newEvents.length,
264262
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
265-
leaderMergeCounter,
266263
})
267264

268265
debugInfo.advanceCount++
@@ -339,7 +336,7 @@ export interface ClientSessionSyncProcessor {
339336
{
340337
writeTables: Set<string>
341338
},
342-
never
339+
SyncError
343340
>
344341
boot: Effect.Effect<void, UnexpectedError, Scope.Scope>
345342
/**

packages/@livestore/livestore/src/store/store.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
import {
2+
type Bindable,
23
type ClientSession,
34
type ClientSessionSyncProcessor,
4-
type ParamsObject,
5-
type PreparedBindValues,
6-
type QueryBuilder,
7-
UnexpectedError,
8-
} from '@livestore/common'
9-
import {
105
Devtools,
116
getDurationMsFromSpan,
127
getExecStatementsFromMaterializer,
@@ -16,9 +11,12 @@ import {
1611
isQueryBuilder,
1712
liveStoreVersion,
1813
makeClientSessionSyncProcessor,
14+
type PreparedBindValues,
1915
prepareBindValues,
16+
type QueryBuilder,
2017
QueryBuilderAstSymbol,
2118
replaceSessionIdSymbol,
19+
UnexpectedError,
2220
} from '@livestore/common'
2321
import type { LiveStoreSchema } from '@livestore/common/schema'
2422
import { getEventDef, LiveStoreEvent, SystemTables } from '@livestore/common/schema'
@@ -178,6 +176,8 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
178176
for (const table of writeTables) {
179177
writeTablesForEvent.add(table)
180178
}
179+
180+
this.sqliteDbWrapper.debug.head = eventDecoded.seqNum
181181
}
182182
}
183183

@@ -441,15 +441,19 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
441441
| LiveQuery<TResult>
442442
| LiveQueryDef<TResult>
443443
| SignalDef<TResult>
444-
| { query: string; bindValues: ParamsObject },
444+
| { query: string; bindValues: Bindable; schema?: Schema.Schema<TResult> },
445445
options?: { otelContext?: otel.Context; debugRefreshReason?: RefreshReason },
446446
): TResult => {
447447
this.checkShutdown('query')
448448

449449
if (typeof query === 'object' && 'query' in query && 'bindValues' in query) {
450-
return this.sqliteDbWrapper.cachedSelect(query.query, prepareBindValues(query.bindValues, query.query), {
450+
const res = this.sqliteDbWrapper.cachedSelect(query.query, prepareBindValues(query.bindValues, query.query), {
451451
otelContext: options?.otelContext,
452452
}) as any
453+
if (query.schema) {
454+
return Schema.decodeSync(query.schema)(res)
455+
}
456+
return res
453457
} else if (isQueryBuilder(query)) {
454458
const ast = query[QueryBuilderAstSymbol]
455459
if (ast._tag === 'RowQuery') {
@@ -626,14 +630,14 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
626630
const otelContext = otel.trace.setSpan(otel.context.active(), span)
627631

628632
try {
633+
// Materialize events to state
629634
const { writeTables } = (() => {
630635
try {
631636
const materializeEvents = () => {
632637
return Runtime.runSync(this.effectContext.runtime, this.syncProcessor.push(events))
633638
}
634639

635640
if (events.length > 1) {
636-
// TODO: what to do about leader transaction here?
637641
return this.sqliteDbWrapper.txn(materializeEvents)
638642
} else {
639643
return materializeEvents()

0 commit comments

Comments
 (0)