Skip to content

Commit 13daf16

Browse files
committed
refactor: migrate ClientSessionSyncProcessor.push to Effect
1 parent a4528ce commit 13daf16

File tree

4 files changed

+94
-39
lines changed

4 files changed

+94
-39
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ import { Option, type Runtime, type Scope } from '@livestore/utils/effect'
44
import { BucketQueue, Effect, FiberHandle, Queue, Schema, Stream, Subscribable } from '@livestore/utils/effect'
55
import * as otel from '@opentelemetry/api'
66

7-
import { type ClientSession, SyncError, type UnexpectedError } from '../adapter-types.js'
7+
import type { ClientSession, UnexpectedError } from '../adapter-types.js'
88
import * as EventSequenceNumber from '../schema/EventSequenceNumber.js'
99
import * as LiveStoreEvent from '../schema/LiveStoreEvent.js'
10-
import { getEventDef, type LiveStoreSchema } from '../schema/mod.js'
10+
import { getEventDef, type LiveStoreSchema, SystemTables } from '../schema/mod.js'
11+
import { sql } from '../util.js'
1112
import * as SyncState from './syncstate.js'
1213

1314
/**
@@ -20,10 +21,6 @@ import * as SyncState from './syncstate.js'
2021
* - We might need to make the rebase behaviour configurable e.g. to let users manually trigger a rebase
2122
*
2223
* Longer term we should evalutate whether we can unify the ClientSessionSyncProcessor with the LeaderSyncProcessor.
23-
*
24-
* The session and leader sync processor are different in the following ways:
25-
* - The leader sync processor pulls regular LiveStore events, while the session sync processor pulls SyncState.PayloadUpstream items
26-
* - The session sync processor has no downstream nodes.
2724
*/
2825
export const makeClientSessionSyncProcessor = ({
2926
schema,
@@ -40,7 +37,7 @@ export const makeClientSessionSyncProcessor = ({
4037
clientSession: ClientSession
4138
runtime: Runtime.Runtime<Scope.Scope>
4239
materializeEvent: (
43-
eventDecoded: LiveStoreEvent.AnyDecoded,
40+
eventDecoded: LiveStoreEvent.PartialAnyDecoded,
4441
options: { otelContext: otel.Context; withChangeset: boolean; materializerHashLeader: Option.Option<number> },
4542
) => {
4643
writeTables: Set<string>
@@ -79,16 +76,16 @@ export const makeClientSessionSyncProcessor = ({
7976
/** We're queuing push requests to reduce the number of messages sent to the leader by batching them */
8077
const leaderPushQueue = BucketQueue.make<LiveStoreEvent.EncodedWithMeta>().pipe(Effect.runSync)
8178

82-
const push: ClientSessionSyncProcessor['push'] = (batch, { otelContext }) => {
79+
const push: ClientSessionSyncProcessor['push'] = Effect.fn('client-session-sync-processor:push')(function* (
80+
batch,
81+
{ otelContext },
82+
) {
8383
// TODO validate batch
8484

8585
let baseEventSequenceNumber = syncStateRef.current.localHead
8686
const encodedEventDefs = batch.map(({ name, args }) => {
8787
const eventDef = getEventDef(schema, name)
88-
const nextNumPair = EventSequenceNumber.nextPair({
89-
seqNum: baseEventSequenceNumber,
90-
isClient: eventDef.eventDef.options.clientOnly,
91-
})
88+
const nextNumPair = EventSequenceNumber.nextPair(baseEventSequenceNumber, eventDef.eventDef.options.clientOnly)
9289
baseEventSequenceNumber = nextNumPair.seqNum
9390
return new LiveStoreEvent.EncodedWithMeta(
9491
Schema.encodeUnknownSync(eventSchema)({
@@ -100,29 +97,29 @@ export const makeClientSessionSyncProcessor = ({
10097
}),
10198
)
10299
})
103-
104-
const mergeResult = SyncState.merge({
105-
syncState: syncStateRef.current,
106-
payload: { _tag: 'local-push', newEvents: encodedEventDefs },
107-
isClientEvent,
108-
isEqualEvent: LiveStoreEvent.isEqualEncoded,
109-
})
100+
yield* Effect.annotateCurrentSpan({ batchSize: encodedEventDefs.length })
101+
102+
const mergeResult = yield* Effect.sync(() =>
103+
SyncState.merge({
104+
syncState: syncStateRef.current,
105+
payload: { _tag: 'local-push', newEvents: encodedEventDefs },
106+
isClientEvent,
107+
isEqualEvent: LiveStoreEvent.isEqualEncoded,
108+
}),
109+
)
110110

111111
if (mergeResult._tag === 'unexpected-error') {
112-
return shouldNeverHappen('Unexpected error in client-session-sync-processor', mergeResult.message)
112+
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.cause}`))
113113
}
114114

115-
span.addEvent('local-push', {
116-
batchSize: encodedEventDefs.length,
117-
mergeResult: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
118-
})
115+
if (TRACE_VERBOSE) yield* Effect.annotateCurrentSpan({ mergeResult: JSON.stringify(mergeResult) })
119116

120117
if (mergeResult._tag !== 'advance') {
121-
return shouldNeverHappen(`Expected advance, got ${mergeResult._tag}`)
118+
return yield* Effect.die(new Error(`Expected advance, got ${mergeResult._tag}`))
122119
}
123120

124121
syncStateRef.current = mergeResult.newSyncState
125-
syncStateUpdateQueue.offer(mergeResult.newSyncState).pipe(Effect.runSync)
122+
yield* syncStateUpdateQueue.offer(mergeResult.newSyncState)
126123

127124
// Materialize events to state
128125
const writeTables = new Set<string>()
@@ -141,16 +138,18 @@ export const makeClientSessionSyncProcessor = ({
141138
for (const table of newWriteTables) {
142139
writeTables.add(table)
143140
}
144-
event.meta.sessionChangeset = sessionChangeset
145-
event.meta.materializerHashSession = materializerHash
141+
yield* Effect.sync(() => {
142+
event.meta.sessionChangeset = sessionChangeset
143+
event.meta.materializerHashSession = materializerHash
144+
})
146145
}
147146

148147
// Trigger push to leader
149148
// console.debug('pushToLeader', encodedEventDefs.length, ...encodedEventDefs.map((_) => _.toJSON()))
150-
BucketQueue.offerAll(leaderPushQueue, encodedEventDefs).pipe(Effect.runSync)
149+
yield* BucketQueue.offerAll(leaderPushQueue, encodedEventDefs)
151150

152151
return { writeTables }
153-
}
152+
})
154153

155154
const debugInfo = {
156155
rebaseCount: 0,
@@ -190,11 +189,18 @@ export const makeClientSessionSyncProcessor = ({
190189

191190
yield* FiberHandle.run(leaderPushingFiberHandle, backgroundLeaderPushing)
192191

192+
const getMergeCounter = () =>
193+
clientSession.sqliteDb.select<{ mergeCounter: number }>(
194+
sql`SELECT mergeCounter FROM ${SystemTables.LEADER_MERGE_COUNTER_TABLE} WHERE id = 0`,
195+
)[0]?.mergeCounter ?? 0
196+
193197
// NOTE We need to lazily call `.pull` as we want the cursor to be updated
194198
yield* Stream.suspend(() =>
195-
clientSession.leaderThread.events.pull({ cursor: syncStateRef.current.upstreamHead }),
199+
clientSession.leaderThread.events.pull({
200+
cursor: { mergeCounter: getMergeCounter(), eventNum: syncStateRef.current.localHead },
201+
}),
196202
).pipe(
197-
Stream.tap(({ payload }) =>
203+
Stream.tap(({ payload, mergeCounter: leaderMergeCounter }) =>
198204
Effect.gen(function* () {
199205
// yield* Effect.logDebug('ClientSessionSyncProcessor:pull', payload)
200206

@@ -210,13 +216,13 @@ export const makeClientSessionSyncProcessor = ({
210216
})
211217

212218
if (mergeResult._tag === 'unexpected-error') {
213-
return yield* new SyncError({ cause: mergeResult.message })
219+
return yield* Effect.fail(mergeResult.cause)
214220
} else if (mergeResult._tag === 'reject') {
215221
return shouldNeverHappen('Unexpected reject in client-session-sync-processor', mergeResult)
216222
}
217223

218224
syncStateRef.current = mergeResult.newSyncState
219-
yield* syncStateUpdateQueue.offer(mergeResult.newSyncState)
225+
syncStateUpdateQueue.offer(mergeResult.newSyncState).pipe(Effect.runSync)
220226

221227
if (mergeResult._tag === 'rebase') {
222228
span.addEvent('merge:pull:rebase', {
@@ -225,7 +231,7 @@ export const makeClientSessionSyncProcessor = ({
225231
newEventsCount: mergeResult.newEvents.length,
226232
rollbackCount: mergeResult.rollbackEvents.length,
227233
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
228-
rebaseGeneration: mergeResult.newSyncState.localHead.rebaseGeneration,
234+
leaderMergeCounter,
229235
})
230236

231237
debugInfo.rebaseCount++
@@ -242,6 +248,7 @@ export const makeClientSessionSyncProcessor = ({
242248
'merge:pull:rebase: rollback',
243249
mergeResult.rollbackEvents.length,
244250
...mergeResult.rollbackEvents.slice(0, 10).map((_) => _.toJSON()),
251+
{ leaderMergeCounter },
245252
).pipe(Effect.provide(runtime), Effect.runSync)
246253
}
247254

@@ -261,6 +268,7 @@ export const makeClientSessionSyncProcessor = ({
261268
payload: TRACE_VERBOSE ? JSON.stringify(payload) : undefined,
262269
newEventsCount: mergeResult.newEvents.length,
263270
res: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
271+
leaderMergeCounter,
264272
})
265273

266274
debugInfo.advanceCount++
@@ -337,9 +345,12 @@ export interface ClientSessionSyncProcessor {
337345
push: (
338346
batch: ReadonlyArray<LiveStoreEvent.PartialAnyDecoded>,
339347
options: { otelContext: otel.Context },
340-
) => {
341-
writeTables: Set<string>
342-
}
348+
) => Effect.Effect<
349+
{
350+
writeTables: Set<string>
351+
},
352+
never
353+
>
343354
boot: Effect.Effect<void, UnexpectedError, Scope.Scope>
344355
/**
345356
* Only used for debugging / observability.

packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,12 @@ exports[`otel > otel 3`] = `
409409
",
410410
},
411411
},
412+
{
413+
"_name": "client-session-sync-processor:push",
414+
"attributes": {
415+
"batchSize": 1,
416+
},
417+
},
412418
{
413419
"_name": "@livestore/common:LeaderSyncProcessor:push",
414420
"attributes": {
@@ -684,6 +690,12 @@ exports[`otel > with thunks 7`] = `
684690
",
685691
},
686692
},
693+
{
694+
"_name": "client-session-sync-processor:push",
695+
"attributes": {
696+
"batchSize": 1,
697+
},
698+
},
687699
{
688700
"_name": "@livestore/common:LeaderSyncProcessor:push",
689701
"attributes": {
@@ -793,6 +805,12 @@ exports[`otel > with thunks with query builder and without labels 3`] = `
793805
",
794806
},
795807
},
808+
{
809+
"_name": "client-session-sync-processor:push",
810+
"attributes": {
811+
"batchSize": 1,
812+
},
813+
},
796814
{
797815
"_name": "@livestore/common:LeaderSyncProcessor:push",
798816
"attributes": {

packages/@livestore/livestore/src/store/store.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,9 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema, TContext =
593593
// Materialize events to state
594594
const { writeTables } = (() => {
595595
try {
596-
const materializeEvents = () => this.syncProcessor.push(events, { otelContext })
596+
const materializeEvents = () => {
597+
return Runtime.runSync(this.effectContext.runtime, this.syncProcessor.push(events, { otelContext }))
598+
}
597599

598600
if (events.length > 1) {
599601
return this.sqliteDbWrapper.txn(materializeEvents)

packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,18 @@ exports[`useClientDocument > otel > should update the data based on component ke
2020
",
2121
},
2222
},
23+
{
24+
"_name": "client-session-sync-processor:push",
25+
"attributes": {
26+
"batchSize": 1,
27+
},
28+
},
29+
{
30+
"_name": "client-session-sync-processor:push",
31+
"attributes": {
32+
"batchSize": 1,
33+
},
34+
},
2335
{
2436
"_name": "@livestore/common:LeaderSyncProcessor:push",
2537
"attributes": {
@@ -246,6 +258,18 @@ exports[`useClientDocument > otel > should update the data based on component ke
246258
",
247259
},
248260
},
261+
{
262+
"_name": "client-session-sync-processor:push",
263+
"attributes": {
264+
"batchSize": 1,
265+
},
266+
},
267+
{
268+
"_name": "client-session-sync-processor:push",
269+
"attributes": {
270+
"batchSize": 1,
271+
},
272+
},
249273
{
250274
"_name": "@livestore/common:LeaderSyncProcessor:push",
251275
"attributes": {

0 commit comments

Comments
 (0)