Skip to content

Commit 6e169e9

Browse files
committed
refactor: migrate ClientSessionSyncProcessor.push to Effect
1 parent f4d248e commit 6e169e9

File tree

4 files changed

+75
-23
lines changed

4 files changed

+75
-23
lines changed

packages/@livestore/common/src/sync/ClientSessionSyncProcessor.ts

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,10 @@ export const makeClientSessionSyncProcessor = ({
7676
/** We're queuing push requests to reduce the number of messages sent to the leader by batching them */
7777
const leaderPushQueue = BucketQueue.make<LiveStoreEvent.EncodedWithMeta>().pipe(Effect.runSync)
7878

79-
const push: ClientSessionSyncProcessor['push'] = (batch, { otelContext }) => {
79+
const push: ClientSessionSyncProcessor['push'] = Effect.fn('client-session-sync-processor:push')(function* (
80+
batch,
81+
{ otelContext },
82+
) {
8083
// TODO validate batch
8184

8285
let baseEventSequenceNumber = syncStateRef.current.localHead
@@ -94,29 +97,29 @@ export const makeClientSessionSyncProcessor = ({
9497
}),
9598
)
9699
})
97-
98-
const mergeResult = SyncState.merge({
99-
syncState: syncStateRef.current,
100-
payload: { _tag: 'local-push', newEvents: encodedEventDefs },
101-
isClientEvent,
102-
isEqualEvent: LiveStoreEvent.isEqualEncoded,
103-
})
100+
yield* Effect.annotateCurrentSpan({ batchSize: encodedEventDefs.length })
101+
102+
const mergeResult = yield* Effect.sync(() =>
103+
SyncState.merge({
104+
syncState: syncStateRef.current,
105+
payload: { _tag: 'local-push', newEvents: encodedEventDefs },
106+
isClientEvent,
107+
isEqualEvent: LiveStoreEvent.isEqualEncoded,
108+
}),
109+
)
104110

105111
if (mergeResult._tag === 'unexpected-error') {
106-
return shouldNeverHappen('Unexpected error in client-session-sync-processor', mergeResult.cause)
112+
return yield* Effect.die(new Error(`Unexpected error in client-session-sync-processor: ${mergeResult.cause}`))
107113
}
108114

109-
span.addEvent('local-push', {
110-
batchSize: encodedEventDefs.length,
111-
mergeResult: TRACE_VERBOSE ? JSON.stringify(mergeResult) : undefined,
112-
})
115+
if (TRACE_VERBOSE) yield* Effect.annotateCurrentSpan({ mergeResult: JSON.stringify(mergeResult) })
113116

114117
if (mergeResult._tag !== 'advance') {
115-
return shouldNeverHappen(`Expected advance, got ${mergeResult._tag}`)
118+
return yield* Effect.die(new Error(`Expected advance, got ${mergeResult._tag}`))
116119
}
117120

118121
syncStateRef.current = mergeResult.newSyncState
119-
syncStateUpdateQueue.offer(mergeResult.newSyncState).pipe(Effect.runSync)
122+
yield* syncStateUpdateQueue.offer(mergeResult.newSyncState)
120123

121124
// Materialize events to state
122125
const writeTables = new Set<string>()
@@ -135,16 +138,18 @@ export const makeClientSessionSyncProcessor = ({
135138
for (const table of newWriteTables) {
136139
writeTables.add(table)
137140
}
138-
event.meta.sessionChangeset = sessionChangeset
139-
event.meta.materializerHashSession = materializerHash
141+
yield* Effect.sync(() => {
142+
event.meta.sessionChangeset = sessionChangeset
143+
event.meta.materializerHashSession = materializerHash
144+
})
140145
}
141146

142147
// Trigger push to leader
143148
// console.debug('pushToLeader', encodedEventDefs.length, ...encodedEventDefs.map((_) => _.toJSON()))
144-
BucketQueue.offerAll(leaderPushQueue, encodedEventDefs).pipe(Effect.runSync)
149+
yield* BucketQueue.offerAll(leaderPushQueue, encodedEventDefs)
145150

146151
return { writeTables }
147-
}
152+
})
148153

149154
const debugInfo = {
150155
rebaseCount: 0,
@@ -340,9 +345,12 @@ export interface ClientSessionSyncProcessor {
340345
push: (
341346
batch: ReadonlyArray<LiveStoreEvent.PartialAnyDecoded>,
342347
options: { otelContext: otel.Context },
343-
) => {
344-
writeTables: Set<string>
345-
}
348+
) => Effect.Effect<
349+
{
350+
writeTables: Set<string>
351+
},
352+
never
353+
>
346354
boot: Effect.Effect<void, UnexpectedError, Scope.Scope>
347355
/**
348356
* Only used for debugging / observability.

packages/@livestore/livestore/src/live-queries/__snapshots__/db-query.test.ts.snap

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,12 @@ exports[`otel > otel 3`] = `
409409
",
410410
},
411411
},
412+
{
413+
"_name": "client-session-sync-processor:push",
414+
"attributes": {
415+
"batchSize": 1,
416+
},
417+
},
412418
{
413419
"_name": "@livestore/common:LeaderSyncProcessor:push",
414420
"attributes": {
@@ -684,6 +690,12 @@ exports[`otel > with thunks 7`] = `
684690
",
685691
},
686692
},
693+
{
694+
"_name": "client-session-sync-processor:push",
695+
"attributes": {
696+
"batchSize": 1,
697+
},
698+
},
687699
{
688700
"_name": "@livestore/common:LeaderSyncProcessor:push",
689701
"attributes": {
@@ -793,6 +805,12 @@ exports[`otel > with thunks with query builder and without labels 3`] = `
793805
",
794806
},
795807
},
808+
{
809+
"_name": "client-session-sync-processor:push",
810+
"attributes": {
811+
"batchSize": 1,
812+
},
813+
},
796814
{
797815
"_name": "@livestore/common:LeaderSyncProcessor:push",
798816
"attributes": {

packages/@livestore/livestore/src/store/store.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,9 @@ export class Store<TSchema extends LiveStoreSchema = LiveStoreSchema, TContext =
589589
// Materialize events to state
590590
const { writeTables } = (() => {
591591
try {
592-
const materializeEvents = () => this.syncProcessor.push(events, { otelContext })
592+
const materializeEvents = () => {
593+
return Runtime.runSync(this.effectContext.runtime, this.syncProcessor.push(events, { otelContext }))
594+
}
593595

594596
if (events.length > 1) {
595597
return this.sqliteDbWrapper.txn(materializeEvents)

packages/@livestore/react/src/__snapshots__/useClientDocument.test.tsx.snap

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,18 @@ exports[`useClientDocument > otel > should update the data based on component ke
2020
",
2121
},
2222
},
23+
{
24+
"_name": "client-session-sync-processor:push",
25+
"attributes": {
26+
"batchSize": 1,
27+
},
28+
},
29+
{
30+
"_name": "client-session-sync-processor:push",
31+
"attributes": {
32+
"batchSize": 1,
33+
},
34+
},
2335
{
2436
"_name": "@livestore/common:LeaderSyncProcessor:push",
2537
"attributes": {
@@ -246,6 +258,18 @@ exports[`useClientDocument > otel > should update the data based on component ke
246258
",
247259
},
248260
},
261+
{
262+
"_name": "client-session-sync-processor:push",
263+
"attributes": {
264+
"batchSize": 1,
265+
},
266+
},
267+
{
268+
"_name": "client-session-sync-processor:push",
269+
"attributes": {
270+
"batchSize": 1,
271+
},
272+
},
249273
{
250274
"_name": "@livestore/common:LeaderSyncProcessor:push",
251275
"attributes": {

0 commit comments

Comments
 (0)