Skip to content

Commit 67064d4

Browse files
committed
chore: sync changes
1 parent 3abf06f commit 67064d4

File tree

2 files changed

+32
-31
lines changed

2 files changed

+32
-31
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
44
import { BucketQueue, Effect, FiberHandle, Queue, Schema, Stream, Subscribable } from '@livestore/utils/effect'
55
import type * as otel from '@opentelemetry/api'
66

7-
import type { ClientSession, UnexpectedError } from '../adapter-types.js'
7+
import { type ClientSession, SyncError, type UnexpectedError } from '../adapter-types.js'
88
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
99
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
10-
import { getEventDef, type LiveStoreSchema, SystemTables } from '../schema/mod.js'
11-
import { sql } from '../util.js'
10+
import { getEventDef, type LiveStoreSchema } from '../schema/mod.js'
1211
import * as SyncState from './syncstate.js'
1312

1413
/**
@@ -21,6 +20,10 @@ import * as SyncState from './syncstate.js'
2120
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
2221
*
2322
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
23+
*
24+
* The session and leader sync processor are different in the following ways:
25+
* - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
26+
* - The session sync processor has no downstream nodes.
2427
*/
2528
export const makeClientSessionSyncProcessor = ({
2629
schema,
@@ -37,7 +40,7 @@ export const makeClientSessionSyncProcessor = ({
3740
clientSession: ClientSession
3841
runtime: Runtime.Runtime<Scope.Scope>
3942
materializeEvent: (
40-
eventDecoded: LiveStoreEvent.PartialAnyDecoded,
43+
eventDecoded: LiveStoreEvent.AnyDecoded,
4144
options: { withChangeset: boolean; materializerHashLeader: Option.Option<number> },
4245
) => Effect.Effect<{
4346
writeTables: Set<string>
@@ -82,7 +85,10 @@ export const makeClientSessionSyncProcessor = ({
8285
let baseEventSequenceNumber = syncStateRef.current.localHead
8386
const encodedEventDefs = batch.map(({ name, args }) => {
8487
const eventDef = getEventDef(schema, name)
85-
const nextNumPair = EventSequenceNumber.nextPair(baseEventSequenceNumber, eventDef.eventDef.options.clientOnly)
88+
const nextNumPair = EventSequenceNumber.nextPair({
89+
seqNum: baseEventSequenceNumber,
90+
isClient: eventDef.eventDef.options.clientOnly,
91+
})
8692
baseEventSequenceNumber = nextNumPair.seqNum
8793
return new LiveStoreEvent.EncodedWithMeta(
8894
Schema.encodeUnknownSync(eventSchema)({
@@ -106,13 +112,13 @@ export const makeClientSessionSyncProcessor = ({
106112
)
107113

108114
if (mergeResult._tag === 'unexpected-error') {
109-
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.cause}`))
115+
return yield* new SyncError({ cause: mergeResult.message })
110116
}
111117

112118
if (TRACE_VERBOSE) yield* Effect.annotateCurrentSpan({ mergeResult: JSON.stringify(mergeResult) })
113119

114120
if (mergeResult._tag !== 'advance') {
115-
return yield* Effect.die(new Error(`Expected advance, got ${mergeResult._tag}`))
121+
return yield* new SyncError({ cause: `Expected advance, got ${mergeResult._tag}` })
116122
}
117123

118124
syncStateRef.current = mergeResult.newSyncState
@@ -182,18 +188,11 @@ export const makeClientSessionSyncProcessor = ({
182188

183189
yield* FiberHandle.run(leaderPushingFiberHandle, backgroundLeaderPushing)
184190

185-
const getMergeCounter = () =>
186-
clientSession.sqliteDb.select<{ mergeCounter: number }>(
187-
sql`SELECT mergeCounter FROM ${SystemTables.LEADER_MERGE_COUNTER_TABLE} WHERE id = 0`,
188-
)[0]?.mergeCounter ?? 0
189-
190191
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
191192
yield* Stream.suspend(() =>
192-
clientSession.leaderThread.events.pull({
193-
cursor: { mergeCounter: getMergeCounter(), eventNum: syncStateRef.current.localHead },
194-
}),
193+
clientSession.leaderThread.events.pull({ cursor: syncStateRef.current.upstreamHead }),
195194
).pipe(
196-
Stream.tap(({ payload, mergeCounter: leaderMergeCounter }) =>
195+
Stream.tap(({ payload }) =>
197196
Effect.gen(function* () {
198197
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
199198

@@ -209,13 +208,13 @@ export const makeClientSessionSyncProcessor = ({
209208
})
210209

211210
if (mergeResult._tag === 'unexpected-error') {
212-
return yield* Effect.fail(mergeResult.cause)
211+
return yield* new SyncError({ cause: mergeResult.message })
213212
} else if (mergeResult._tag === 'reject') {
214213
return shouldNeverHappen('Unexpected reject in client-session-sync-processor', mergeResult)
215214
}
216215

217216
syncStateRef.current = mergeResult.newSyncState
218-
syncStateUpdateQueue.offer(mergeResult.newSyncState).pipe(Effect.runSync)
217+
yield* syncStateUpdateQueue.offer(mergeResult.newSyncState)
219218

220219
if (mergeResult._tag === 'rebase') {
221220
span.addEvent('merge:pull:rebase', {
@@ -224,7 +223,7 @@ export const makeClientSessionSyncProcessor = ({
224223
newEventsCount: mergeResult.newEvents.length,
225224
rollbackCount: mergeResult.rollbackEvents.length,
226225
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
227-
leaderMergeCounter,
226+
rebaseGeneration: mergeResult.newSyncState.localHead.rebaseGeneration,
228227
})
229228

230229
debugInfo.rebaseCount++
@@ -241,7 +240,6 @@ export const makeClientSessionSyncProcessor = ({
241240
'merge:pull:rebase: rollback',
242241
mergeResult.rollbackEvents.length,
243242
...mergeResult.rollbackEvents.slice(0, 10).map((_) => _.toJSON()),
244-
{ leaderMergeCounter },
245243
).pipe(Effect.provide(runtime), Effect.runSync)
246244
}
247245

@@ -261,7 +259,6 @@ export const makeClientSessionSyncProcessor = ({
261259
payload: TRACE_VERBOSE ? JSON.stringify(payload) : undefined,
262260
newEventsCount: mergeResult.newEvents.length,
263261
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
264-
leaderMergeCounter,
265262
})
266263

267264
debugInfo.advanceCount++
@@ -338,7 +335,7 @@ export interface ClientSessionSyncProcessor {
338335
{
339336
writeTables: Set<string>
340337
},
341-
never
338+
SyncError
342339
>
343340
boot: Effect.Effect<void, UnexpectedError, Scope.Scope>
344341
/**

packages/@livestore/livestore/src/store/store.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
import {
2+
type Bindable,
23
type ClientSession,
34
type ClientSessionSyncProcessor,
4-
type ParamsObject,
5-
type PreparedBindValues,
6-
type QueryBuilder,
7-
UnexpectedError,
8-
} from '@livestore/common'
9-
import {
105
Devtools,
116
getDurationMsFromSpan,
127
getExecStatementsFromMaterializer,
@@ -16,9 +11,12 @@ import {
1611
isQueryBuilder,
1712
liveStoreVersion,
1813
makeClientSessionSyncProcessor,
14+
type PreparedBindValues,
1915
prepareBindValues,
16+
type QueryBuilder,
2017
QueryBuilderAstSymbol,
2118
replaceSessionIdSymbol,
19+
UnexpectedError,
2220
} from '@livestore/common'
2321
import type { LiveStoreSchema } from '@livestore/common/schema'
2422
import { getEventDef, LiveStoreEvent, SystemTables } from '@livestore/common/schema'
@@ -178,6 +176,8 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
178176
for (const table of writeTables) {
179177
writeTablesForEvent.add(table)
180178
}
179+
180+
this.sqliteDbWrapper.debug.head = eventDecoded.seqNum
181181
}
182182
}
183183

@@ -441,15 +441,19 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
441441
| LiveQuery<TResult>
442442
| LiveQueryDef<TResult>
443443
| SignalDef<TResult>
444-
| { query: string; bindValues: ParamsObject },
444+
| { query: string; bindValues: Bindable; schema?: Schema.Schema<TResult> },
445445
options?: { otelContext?: otel.Context; debugRefreshReason?: RefreshReason },
446446
): TResult => {
447447
this.checkShutdown('query')
448448

449449
if (typeof query === 'object' && 'query' in query && 'bindValues' in query) {
450-
return this.sqliteDbWrapper.cachedSelect(query.query, prepareBindValues(query.bindValues, query.query), {
450+
const res = this.sqliteDbWrapper.cachedSelect(query.query, prepareBindValues(query.bindValues, query.query), {
451451
otelContext: options?.otelContext,
452452
}) as any
453+
if (query.schema) {
454+
return Schema.decodeSync(query.schema)(res)
455+
}
456+
return res
453457
} else if (isQueryBuilder(query)) {
454458
const ast = query[QueryBuilderAstSymbol]
455459
if (ast._tag === 'RowQuery') {
@@ -626,14 +630,14 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema.Any, TConte
626630
const otelContext = otel.trace.setSpan(otel.context.active(), span)
627631

628632
try {
633+
// Materialize events to state
629634
const { writeTables } = (() => {
630635
try {
631636
const materializeEvents = () => {
632637
return Runtime.runSync(this.effectContext.runtime, this.syncProcessor.push(events))
633638
}
634639

635640
if (events.length > 1) {
636-
// TODO: what to do about leader transaction here?
637641
return this.sqliteDbWrapper.txn(materializeEvents)
638642
} else {
639643
return materializeEvents()

0 commit comments

Comments
 (0)