summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-04-30 16:45:26 -0400
committerGitHub <[email protected]>2026-04-30 16:45:26 -0400
commitfbcbd24063886a175ea41be1e0b7155cdb544a7b (patch)
treec91e7390a6588060510bcbda6621bd146d3a2820
parent3250b814ce8a3523898d52fa68ee0e5e7ddb129f (diff)
downloadopencode-fbcbd24063886a175ea41be1e0b7155cdb544a7b.tar.gz
opencode-fbcbd24063886a175ea41be1e0b7155cdb544a7b.zip
Add SyncEvent service (#25158)
-rw-r--r--packages/opencode/src/control-plane/workspace.ts91
-rw-r--r--packages/opencode/src/effect/app-runtime.ts2
-rw-r--r--packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts3
-rw-r--r--packages/opencode/src/server/routes/instance/httpapi/server.ts2
-rw-r--r--packages/opencode/src/server/routes/instance/sync.ts2
-rw-r--r--packages/opencode/src/session/revert.ts6
-rw-r--r--packages/opencode/src/session/session.ts56
-rw-r--r--packages/opencode/src/share/session.ts8
-rw-r--r--packages/opencode/src/sync/index.ts204
-rw-r--r--packages/opencode/test/control-plane/workspace.test.ts77
-rw-r--r--packages/opencode/test/sync/index.test.ts323
11 files changed, 427 insertions, 347 deletions
diff --git a/packages/opencode/src/control-plane/workspace.ts b/packages/opencode/src/control-plane/workspace.ts
index fe8046ba9..7f9d078bb 100644
--- a/packages/opencode/src/control-plane/workspace.ts
+++ b/packages/opencode/src/control-plane/workspace.ts
@@ -169,6 +169,7 @@ export const layer = Layer.effect(
const auth = yield* Auth.Service
const session = yield* Session.Service
const http = yield* HttpClient.HttpClient
+ const sync = yield* SyncEvent.Service
const connections = new Map<WorkspaceID, ConnectionStatus>()
const syncFibers = yield* FiberMap.make<WorkspaceID, void, SyncLoopError>()
@@ -307,25 +308,30 @@ export const layer = Layer.effect(
events: events.length,
})
- yield* Effect.sync(() =>
- WorkspaceContext.provide({
+ yield* Effect.promise(async () => {
+ await WorkspaceContext.provide({
workspaceID: space.id,
- fn: () => {
- for (const event of events) {
- SyncEvent.replay(
- {
- id: event.id,
- aggregateID: event.aggregate_id,
- seq: event.seq,
- type: event.type,
- data: event.data,
- },
- { publish: true },
- )
- }
+ async fn() {
+ await Effect.runPromise(
+ Effect.forEach(
+ events,
+ (event) =>
+ sync.replay(
+ {
+ id: event.id,
+ aggregateID: event.aggregate_id,
+ seq: event.seq,
+ type: event.type,
+ data: event.data,
+ },
+ { publish: true },
+ ),
+ { discard: true },
+ ),
+ )
},
- }),
- )
+ })
+ })
})
const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) {
@@ -361,16 +367,28 @@ export const layer = Layer.effect(
setStatus(space.id, "connected")
yield* parseSSE(stream, (evt) =>
- Effect.sync(() => {
- try {
- if (!evt || typeof evt !== "object" || !("payload" in evt)) return
- const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
- if (payload.type === "server.heartbeat") return
-
- if (payload.type === "sync" && payload.syncEvent) {
- SyncEvent.replay(payload.syncEvent)
- }
+ Effect.gen(function* () {
+ if (!evt || typeof evt !== "object" || !("payload" in evt)) return
+ const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
+ if (payload.type === "server.heartbeat") return
+
+ if (payload.type === "sync" && payload.syncEvent) {
+ const failed = yield* sync.replay(payload.syncEvent).pipe(
+ Effect.as(false),
+ Effect.catchCause((error) =>
+ Effect.sync(() => {
+ log.info("failed to replay global event", {
+ workspaceID: space.id,
+ error,
+ })
+ return true
+ }),
+ ),
+ )
+ if (failed) return
+ }
+ try {
const event = evt as { directory?: string; project?: string; payload: unknown }
GlobalBus.emit("event", {
directory: event.directory,
@@ -378,10 +396,10 @@ export const layer = Layer.effect(
workspace: space.id,
payload: event.payload,
})
- } catch (err) {
+ } catch (error) {
log.info("failed to replay global event", {
workspaceID: space.id,
- error: err,
+ error,
})
}
}),
@@ -516,14 +534,12 @@ export const layer = Layer.effect(
const adaptor = getAdaptor(space.projectID, space.type)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
- yield* Effect.sync(() =>
- SyncEvent.run(Session.Event.Updated, {
- sessionID: input.sessionID,
- info: {
- workspaceID: input.workspaceID,
- },
- }),
- )
+ yield* sync.run(Session.Event.Updated, {
+ sessionID: input.sessionID,
+ info: {
+ workspaceID: input.workspaceID,
+ },
+ })
const rows = yield* db((db) =>
db
@@ -593,7 +609,7 @@ export const layer = Layer.effect(
})
if (target.type === "local") {
- SyncEvent.replayAll(events)
+ yield* sync.replayAll(events)
log.info("session restore batch replayed locally", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
@@ -812,6 +828,7 @@ export const layer = Layer.effect(
export const defaultLayer = layer.pipe(
Layer.provide(Auth.defaultLayer),
Layer.provide(Session.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
Layer.provide(FetchHttpClient.layer),
)
diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts
index 84be17068..06969ff9d 100644
--- a/packages/opencode/src/effect/app-runtime.ts
+++ b/packages/opencode/src/effect/app-runtime.ts
@@ -47,6 +47,7 @@ import { Pty } from "@/pty"
import { Installation } from "@/installation"
import { ShareNext } from "@/share/share-next"
import { SessionShare } from "@/share/session"
+import { SyncEvent } from "@/sync"
import { Npm } from "@opencode-ai/core/npm"
import { memoMap } from "@opencode-ai/core/effect/memo-map"
@@ -97,6 +98,7 @@ export const AppLayer = Layer.mergeAll(
Installation.defaultLayer,
ShareNext.defaultLayer,
SessionShare.defaultLayer,
+ SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(Observability.layer))
const rt = ManagedRuntime.make(AppLayer, { memoMap })
diff --git a/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts b/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts
index fbe124993..f4a2f315c 100644
--- a/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts
+++ b/packages/opencode/src/server/routes/instance/httpapi/handlers/sync.ts
@@ -21,6 +21,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
Effect.gen(function* () {
const workspace = yield* Workspace.Service
const scope = yield* Scope.Scope
+ const sync = yield* SyncEvent.Service
const start = Effect.fn("SyncHttpApi.start")(function* () {
yield* workspace
@@ -45,7 +46,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
last: events.at(-1)?.seq,
directory: ctx.payload.directory,
})
- SyncEvent.replayAll(events)
+ yield* sync.replayAll(events)
log.info("sync replay complete", {
sessionID: source,
events: events.length,
diff --git a/packages/opencode/src/server/routes/instance/httpapi/server.ts b/packages/opencode/src/server/routes/instance/httpapi/server.ts
index 62fa18743..f53ddb3ec 100644
--- a/packages/opencode/src/server/routes/instance/httpapi/server.ts
+++ b/packages/opencode/src/server/routes/instance/httpapi/server.ts
@@ -31,6 +31,7 @@ import { SessionSummary } from "@/session/summary"
import { Todo } from "@/session/todo"
import { SessionShare } from "@/share/session"
import { Skill } from "@/skill"
+import { SyncEvent } from "@/sync"
import { ToolRegistry } from "@/tool/registry"
import { lazy } from "@/util/lazy"
import { Vcs } from "@/project/vcs"
@@ -147,6 +148,7 @@ export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes).pipe(
SessionRunState.defaultLayer,
SessionStatus.defaultLayer,
SessionSummary.defaultLayer,
+ SyncEvent.defaultLayer,
Skill.defaultLayer,
Todo.defaultLayer,
ToolRegistry.defaultLayer,
diff --git a/packages/opencode/src/server/routes/instance/sync.ts b/packages/opencode/src/server/routes/instance/sync.ts
index bb816ecc4..b7bf413d4 100644
--- a/packages/opencode/src/server/routes/instance/sync.ts
+++ b/packages/opencode/src/server/routes/instance/sync.ts
@@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() =>
last: events.at(-1)?.seq,
directory: body.directory,
})
- SyncEvent.replayAll(events)
+ await AppRuntime.runPromise(SyncEvent.use.replayAll(events))
log.info("sync replay complete", {
sessionID: source,
diff --git a/packages/opencode/src/session/revert.ts b/packages/opencode/src/session/revert.ts
index da9952ccb..58d69a204 100644
--- a/packages/opencode/src/session/revert.ts
+++ b/packages/opencode/src/session/revert.ts
@@ -38,6 +38,7 @@ export const layer = Layer.effect(
const bus = yield* Bus.Service
const summary = yield* SessionSummary.Service
const state = yield* SessionRunState.Service
+ const sync = yield* SyncEvent.Service
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
yield* state.assertNotBusy(input.sessionID)
@@ -121,7 +122,7 @@ export const layer = Layer.effect(
remove.push(msg)
}
for (const msg of remove) {
- SyncEvent.run(MessageV2.Event.Removed, {
+ yield* sync.run(MessageV2.Event.Removed, {
sessionID,
messageID: msg.info.id,
})
@@ -133,7 +134,7 @@ export const layer = Layer.effect(
const removeParts = target.parts.slice(idx)
target.parts = target.parts.slice(0, idx)
for (const part of removeParts) {
- SyncEvent.run(MessageV2.Event.PartRemoved, {
+ yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID,
messageID: target.info.id,
partID: part.id,
@@ -156,6 +157,7 @@ export const defaultLayer = Layer.suspend(() =>
Layer.provide(Storage.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(SessionSummary.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
),
)
diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts
index 72c4d241e..5534976e3 100644
--- a/packages/opencode/src/session/session.ts
+++ b/packages/opencode/src/session/session.ts
@@ -443,11 +443,12 @@ export type Patch = Types.DeepMutable<SyncEvent.Event<typeof Event.Updated>["dat
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
-export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
+export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const storage = yield* Storage.Service
+ const sync = yield* SyncEvent.Service
const createNext = Effect.fn("Session.createNext")(function* (input: {
id?: SessionID
@@ -477,7 +478,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
}
log.info("created", result)
- yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
+ yield* sync.run(Event.Created, { sessionID: result.id, info: result })
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
// This only exist for backwards compatibility. We should not be
@@ -525,10 +526,8 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
Effect.catchCause(() => Effect.succeed(false)),
)
- yield* Effect.sync(() => {
- SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
- SyncEvent.remove(sessionID)
- })
+ yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
+ yield* sync.remove(sessionID)
} catch (e) {
log.error(e)
}
@@ -536,19 +535,17 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
Effect.gen(function* () {
- yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
+ yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
return msg
}).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
- yield* Effect.sync(() =>
- SyncEvent.run(MessageV2.Event.PartUpdated, {
- sessionID: part.sessionID,
- part: structuredClone(part),
- time: Date.now(),
- }),
- )
+ yield* sync.run(MessageV2.Event.PartUpdated, {
+ sessionID: part.sessionID,
+ part: structuredClone(part),
+ time: Date.now(),
+ })
return part
}).pipe(Effect.withSpan("Session.updatePart"))
@@ -635,8 +632,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
return session
})
- const patch = (sessionID: SessionID, info: Patch) =>
- Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
+ const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
yield* patch(sessionID, { time: { updated: Date.now() } })
@@ -693,12 +689,10 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
sessionID: SessionID
messageID: MessageID
}) {
- yield* Effect.sync(() =>
- SyncEvent.run(MessageV2.Event.Removed, {
- sessionID: input.sessionID,
- messageID: input.messageID,
- }),
- )
+ yield* sync.run(MessageV2.Event.Removed, {
+ sessionID: input.sessionID,
+ messageID: input.messageID,
+ })
return input.messageID
})
@@ -707,13 +701,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
messageID: MessageID
partID: PartID
}) {
- yield* Effect.sync(() =>
- SyncEvent.run(MessageV2.Event.PartRemoved, {
- sessionID: input.sessionID,
- messageID: input.messageID,
- partID: input.partID,
- }),
- )
+ yield* sync.run(MessageV2.Event.PartRemoved, {
+ sessionID: input.sessionID,
+ messageID: input.messageID,
+ partID: input.partID,
+ })
return input.partID
})
@@ -764,7 +756,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
}),
)
-export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
+export const defaultLayer = layer.pipe(
+ Layer.provide(Bus.layer),
+ Layer.provide(Storage.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
+)
export function* list(input?: {
directory?: string
diff --git a/packages/opencode/src/share/session.ts b/packages/opencode/src/share/session.ts
index 99e46a009..7e4de204e 100644
--- a/packages/opencode/src/share/session.ts
+++ b/packages/opencode/src/share/session.ts
@@ -21,20 +21,19 @@ export const layer = Layer.effect(
const session = yield* Session.Service
const shareNext = yield* ShareNext.Service
const scope = yield* Scope.Scope
+ const sync = yield* SyncEvent.Service
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
const conf = yield* cfg.get()
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
const result = yield* shareNext.create(sessionID)
- yield* Effect.sync(() =>
- SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
- )
+ yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
return result
})
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
yield* shareNext.remove(sessionID)
- yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
+ yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
})
const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) {
@@ -54,6 +53,7 @@ export const defaultLayer = layer.pipe(
Layer.provide(ShareNext.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(Config.defaultLayer),
+ Layer.provide(SyncEvent.defaultLayer),
)
export * as SessionShare from "./session"
diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts
index 67bc9b9e7..ebf7543af 100644
--- a/packages/opencode/src/sync/index.ts
+++ b/packages/opencode/src/sync/index.ts
@@ -9,9 +9,11 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@opencode-ai/core/flag/flag"
-import { Schema as EffectSchema } from "effect"
+import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
import { zodObject } from "@/util/effect-zod"
import type { DeepMutable } from "@/util/schema"
+import { makeRuntime } from "@/effect/run-service"
+import { serviceUse } from "@/effect/service-use"
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
// when writing to the database. Bus payloads (`Properties`) stay readonly —
@@ -46,6 +48,125 @@ export type SerializedEvent<Def extends Definition = Definition> = Event<Def> &
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
+export interface Interface {
+ readonly run: <Def extends Definition>(
+ def: Def,
+ data: Event<Def>["data"],
+ options?: { publish?: boolean },
+ ) => Effect.Effect<void>
+ readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
+ readonly replayAll: (events: SerializedEvent[], options?: { publish: boolean }) => Effect.Effect<string | undefined>
+ readonly remove: (aggregateID: string) => Effect.Effect<void>
+}
+
+export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
+
+export const layer = Layer.effect(Service)(
+ Effect.gen(function* () {
+ const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) {
+ const def = registry.get(event.type)
+ if (!def) {
+ throw new Error(`Unknown event type: ${event.type}`)
+ }
+
+ const row = Database.use((db) =>
+ db
+ .select({ seq: EventSequenceTable.seq })
+ .from(EventSequenceTable)
+ .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
+ .get(),
+ )
+
+ const latest = row?.seq ?? -1
+ if (event.seq <= latest) return
+
+ const expected = latest + 1
+ if (event.seq !== expected) {
+ throw new Error(
+ `Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
+ )
+ }
+
+ process(def, event, { publish: !!options?.publish })
+ })
+
+ const replayAll: Interface["replayAll"] = Effect.fn("SyncEvent.replayAll")(function* (events, options) {
+ const source = events[0]?.aggregateID
+ if (!source) return undefined
+ if (events.some((item) => item.aggregateID !== source)) {
+ throw new Error("Replay events must belong to the same session")
+ }
+ const start = events[0].seq
+ for (const [i, item] of events.entries()) {
+ const seq = start + i
+ if (item.seq !== seq) {
+ throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
+ }
+ }
+ for (const item of events) {
+ yield* replay(item, options)
+ }
+ return source
+ })
+
+ const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* (def, data, options) {
+ const agg = (data as Record<string, string>)[def.aggregate]
+ // This should never happen: we've enforced it via typescript in
+ // the definition
+ if (agg == null) {
+ throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
+ }
+
+ if (def.version !== versions.get(def.type)) {
+ throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
+ }
+
+ const { publish = true } = options || {}
+
+ // Note that this is an "immediate" transaction which is critical.
+ // We need to make sure we can safely read and write with nothing
+ // else changing the data from under us
+ Database.transaction(
+ (tx) => {
+ const id = EventID.ascending()
+ const row = tx
+ .select({ seq: EventSequenceTable.seq })
+ .from(EventSequenceTable)
+ .where(eq(EventSequenceTable.aggregate_id, agg))
+ .get()
+ const seq = row?.seq != null ? row.seq + 1 : 0
+
+ const event = { id, seq, aggregateID: agg, data }
+ process(def, event, { publish })
+ },
+ {
+ behavior: "immediate",
+ },
+ )
+ })
+
+ const remove: Interface["remove"] = Effect.fn("SyncEvent.remove")(function* (aggregateID) {
+ Database.transaction((tx) => {
+ tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
+ tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
+ })
+ })
+
+ return Service.of({
+ run,
+ replay,
+ replayAll,
+ remove,
+ })
+ }),
+)
+
+export const defaultLayer = layer
+
+export const use = serviceUse(Service)
+
+const runtime = makeRuntime(Service, defaultLayer)
+
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
@@ -186,92 +307,19 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
}
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
- const def = registry.get(event.type)
- if (!def) {
- throw new Error(`Unknown event type: ${event.type}`)
- }
-
- const row = Database.use((db) =>
- db
- .select({ seq: EventSequenceTable.seq })
- .from(EventSequenceTable)
- .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
- .get(),
- )
-
- const latest = row?.seq ?? -1
- if (event.seq <= latest) {
- return
- }
-
- const expected = latest + 1
- if (event.seq !== expected) {
- throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
- }
-
- process(def, event, { publish: !!options?.publish })
+ return runtime.runSync((sync) => sync.replay(event, options))
}
export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
- const source = events[0]?.aggregateID
- if (!source) return
- if (events.some((item) => item.aggregateID !== source)) {
- throw new Error("Replay events must belong to the same session")
- }
- const start = events[0].seq
- for (const [i, item] of events.entries()) {
- const seq = start + i
- if (item.seq !== seq) {
- throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
- }
- }
- for (const item of events) {
- replay(item, options)
- }
- return source
+ return runtime.runSync((sync) => sync.replayAll(events, options))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
- const agg = (data as Record<string, string>)[def.aggregate]
- // This should never happen: we've enforced it via typescript in
- // the definition
- if (agg == null) {
- throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
- }
-
- if (def.version !== versions.get(def.type)) {
- throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
- }
-
- const { publish = true } = options || {}
-
- // Note that this is an "immediate" transaction which is critical.
- // We need to make sure we can safely read and write with nothing
- // else changing the data from under us
- Database.transaction(
- (tx) => {
- const id = EventID.ascending()
- const row = tx
- .select({ seq: EventSequenceTable.seq })
- .from(EventSequenceTable)
- .where(eq(EventSequenceTable.aggregate_id, agg))
- .get()
- const seq = row?.seq != null ? row.seq + 1 : 0
-
- const event = { id, seq, aggregateID: agg, data }
- process(def, event, { publish })
- },
- {
- behavior: "immediate",
- },
- )
+ return runtime.runSync((sync) => sync.run(def, data, options))
}
export function remove(aggregateID: string) {
- Database.transaction((tx) => {
- tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
- tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
- })
+ return runtime.runSync((sync) => sync.remove(aggregateID))
}
export function payloads() {
diff --git a/packages/opencode/test/control-plane/workspace.test.ts b/packages/opencode/test/control-plane/workspace.test.ts
index 6e68730a9..594789b20 100644
--- a/packages/opencode/test/control-plane/workspace.test.ts
+++ b/packages/opencode/test/control-plane/workspace.test.ts
@@ -1,4 +1,4 @@
-import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
+import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"
import fs from "node:fs/promises"
import Http from "node:http"
import path from "node:path"
@@ -1426,48 +1426,41 @@ describe("workspace-old sessionRestore", () => {
})
})
- test("local restore replays batches without fetch and emits progress", async () => {
- await withInstance(async (dir) => {
- const captured = captureGlobalEvents()
- let fetchCallCount = 0
- const replayAll = spyOn(SyncEvent, "replayAll")
- try {
- using server = Bun.serve({
- port: 0,
- fetch() {
- fetchCallCount++
- return Response.json({ ok: true })
- },
- })
- const type = unique("restore-local")
- const info = workspaceInfo(Instance.project.id, type, { directory: dir })
- insertWorkspace(info)
- registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
- const session = await AppRuntime.runPromise(
- SessionNs.Service.use((svc) => svc.create({ title: "restore local" })),
- )
- replaceSessionEvents(session.id, 20)
-
- expect(await restoreWorkspaceSession({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 })
+ it.live("local restore replays batches and emits progress", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const workspace = yield* WorkspaceOld.Service
+ const sessionSvc = yield* SessionNs.Service
+ const captured = captureGlobalEvents()
+ try {
+ const type = unique("restore-local")
+ const info = workspaceInfo(Instance.project.id, type, { directory: dir })
+ insertWorkspace(info)
+ registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
+ const session = yield* sessionSvc.create({ title: "restore local" })
+ replaceSessionEvents(session.id, 20)
- expect(fetchCallCount).toBe(0)
- expect(replayAll).toHaveBeenCalledTimes(3)
- expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1])
- expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe(
- info.id,
- )
- expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
- expect(
- captured.events
- .filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
- .map((event) => event.payload.properties.step),
- ).toEqual([0, 1, 2, 3])
- await removeWorkspace(info.id)
- } finally {
- captured.dispose()
- }
- })
- })
+ expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({
+ total: 3,
+ })
+ expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id)
+ expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
+ expect(
+ captured.events
+ .filter(
+ (event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type,
+ )
+ .map((event) => event.payload.properties.step),
+ ).toEqual([0, 1, 2, 3])
+ yield* workspace.remove(info.id)
+ } finally {
+ captured.dispose()
+ }
+ }),
+ { git: true },
+ ),
+ )
it.live("session restore includes real message and part events in sequence order", () => {
const replay: FetchCall[] = []
diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts
index 32a08715c..0afbb1831 100644
--- a/packages/opencode/test/sync/index.test.ts
+++ b/packages/opencode/test/sync/index.test.ts
@@ -1,16 +1,18 @@
-import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
-import { tmpdir } from "../fixture/fixture"
-import { Schema } from "effect"
+import { describe, expect, beforeEach, afterEach, afterAll } from "bun:test"
+import { provideTmpdirInstance } from "../fixture/fixture"
+import { Effect, Layer, Schema } from "effect"
+import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Bus } from "../../src/bus"
-import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
import { Database } from "@/storage/db"
import { EventTable } from "../../src/sync/event.sql"
-import { Identifier } from "../../src/id/id"
+import { MessageID } from "../../src/session/schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import { initProjectors } from "../../src/server/projectors"
+import { testEffect } from "../lib/effect"
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
+const it = testEffect(Layer.mergeAll(SyncEvent.defaultLayer, CrossSpawnSpawner.defaultLayer))
beforeEach(() => {
Database.close()
@@ -22,19 +24,6 @@ afterEach(() => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
})
-function withInstance(fn: () => void | Promise<void>) {
- return async () => {
- await using tmp = await tmpdir()
-
- await Instance.provide({
- directory: tmp.path,
- fn: async () => {
- await fn()
- },
- })
- }
-}
-
describe("SyncEvent", () => {
function setup() {
SyncEvent.reset()
@@ -59,179 +48,209 @@ describe("SyncEvent", () => {
return { Created, Sent }
}
+ function expectDefect<A, E, R>(effect: Effect.Effect<A, E, R>, pattern: RegExp) {
+ return Effect.gen(function* () {
+ const exit = yield* Effect.exit(effect)
+ if (exit._tag === "Success") throw new Error("Expected effect to fail")
+ expect(String(exit.cause)).toMatch(pattern)
+ })
+ }
+
afterAll(() => {
SyncEvent.reset()
initProjectors()
})
describe("run", () => {
- test(
+ it.live(
"inserts event row",
- withInstance(() => {
- const { Created } = setup()
- SyncEvent.run(Created, { id: "evt_1", name: "first" })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(1)
- expect(rows[0].type).toBe("item.created.1")
- expect(rows[0].aggregate_id).toBe("evt_1")
- }),
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const { Created } = setup()
+ yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
+ const rows = Database.use((db) => db.select().from(EventTable).all())
+ expect(rows).toHaveLength(1)
+ expect(rows[0].type).toBe("item.created.1")
+ expect(rows[0].aggregate_id).toBe("evt_1")
+ }),
+ ),
)
- test(
+ it.live(
"increments seq per aggregate",
- withInstance(() => {
- const { Created } = setup()
- SyncEvent.run(Created, { id: "evt_1", name: "first" })
- SyncEvent.run(Created, { id: "evt_1", name: "second" })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(2)
- expect(rows[1].seq).toBe(rows[0].seq + 1)
- }),
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const { Created } = setup()
+ yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
+ yield* SyncEvent.use.run(Created, { id: "evt_1", name: "second" })
+ const rows = Database.use((db) => db.select().from(EventTable).all())
+ expect(rows).toHaveLength(2)
+ expect(rows[1].seq).toBe(rows[0].seq + 1)
+ }),
+ ),
)
- test(
+ it.live(
"uses custom aggregate field from agg()",
- withInstance(() => {
- const { Sent } = setup()
- SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(1)
- expect(rows[0].aggregate_id).toBe("evt_1")
- }),
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const { Sent } = setup()
+ yield* SyncEvent.use.run(Sent, { item_id: "evt_1", to: "james" })
+ const rows = Database.use((db) => db.select().from(EventTable).all())
+ expect(rows).toHaveLength(1)
+ expect(rows[0].aggregate_id).toBe("evt_1")
+ }),
+ ),
)
- test(
+ it.live(
"emits events",
- withInstance(async () => {
- const { Created } = setup()
- const events: Array<{
- type: string
- properties: { id: string; name: string }
- }> = []
- const received = new Promise<void>((resolve) => {
- Bus.subscribeAll((event) => {
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const { Created } = setup()
+ const events: Array<{
+ type: string
+ properties: { id: string; name: string }
+ }> = []
+ let resolve = () => {}
+ const received = new Promise<void>((done) => {
+ resolve = done
+ })
+ const dispose = Bus.subscribeAll((event) => {
events.push(event)
resolve()
})
- })
-
- SyncEvent.run(Created, { id: "evt_1", name: "test" })
-
- await received
- expect(events).toHaveLength(1)
- expect(events[0]).toEqual({
- type: "item.created",
- properties: {
- id: "evt_1",
- name: "test",
- },
- })
- }),
+ try {
+ yield* SyncEvent.use.run(Created, { id: "evt_1", name: "test" })
+ yield* Effect.promise(() => received)
+ expect(events).toHaveLength(1)
+ expect(events[0]).toEqual({
+ type: "item.created",
+ properties: {
+ id: "evt_1",
+ name: "test",
+ },
+ })
+ } finally {
+ dispose()
+ }
+ }),
+ ),
)
})
describe("replay", () => {
- test(
+ it.live(
"inserts event from external payload",
- withInstance(() => {
- const id = Identifier.descending("message")
- SyncEvent.replay({
- id: "evt_1",
- type: "item.created.1",
- seq: 0,
- aggregateID: id,
- data: { id, name: "replayed" },
- })
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows).toHaveLength(1)
- expect(rows[0].aggregate_id).toBe(id)
- }),
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const id = MessageID.ascending()
+ yield* SyncEvent.use.replay({
+ id: "evt_1",
+ type: "item.created.1",
+ seq: 0,
+ aggregateID: id,
+ data: { id, name: "replayed" },
+ })
+ const rows = Database.use((db) => db.select().from(EventTable).all())
+ expect(rows).toHaveLength(1)
+ expect(rows[0].aggregate_id).toBe(id)
+ }),
+ ),
)
- test(
+ it.live(
"throws on sequence mismatch",
- withInstance(() => {
- const id = Identifier.descending("message")
- SyncEvent.replay({
- id: "evt_1",
- type: "item.created.1",
- seq: 0,
- aggregateID: id,
- data: { id, name: "first" },
- })
- expect(() =>
- SyncEvent.replay({
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const id = MessageID.ascending()
+ yield* SyncEvent.use.replay({
id: "evt_1",
type: "item.created.1",
- seq: 5,
+ seq: 0,
aggregateID: id,
- data: { id, name: "bad" },
- }),
- ).toThrow(/Sequence mismatch/)
- }),
+ data: { id, name: "first" },
+ })
+ yield* expectDefect(
+ SyncEvent.use.replay({
+ id: "evt_1",
+ type: "item.created.1",
+ seq: 5,
+ aggregateID: id,
+ data: { id, name: "bad" },
+ }),
+ /Sequence mismatch/,
+ )
+ }),
+ ),
)
- test(
+ it.live(
"throws on unknown event type",
- withInstance(() => {
- expect(() =>
- SyncEvent.replay({
- id: "evt_1",
- type: "unknown.event.1",
- seq: 0,
- aggregateID: "x",
- data: {},
- }),
- ).toThrow(/Unknown event type/)
- }),
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ yield* expectDefect(
+ SyncEvent.use.replay({
+ id: "evt_1",
+ type: "unknown.event.1",
+ seq: 0,
+ aggregateID: "x",
+ data: {},
+ }),
+ /Unknown event type/,
+ )
+ }),
+ ),
)
- test(
+ it.live(
"replayAll accepts later chunks after the first batch",
- withInstance(() => {
- const { Created } = setup()
- const id = Identifier.descending("message")
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const { Created } = setup()
+ const id = MessageID.ascending()
- const one = SyncEvent.replayAll([
- {
- id: "evt_1",
- type: SyncEvent.versionedType(Created.type, Created.version),
- seq: 0,
- aggregateID: id,
- data: { id, name: "first" },
- },
- {
- id: "evt_2",
- type: SyncEvent.versionedType(Created.type, Created.version),
- seq: 1,
- aggregateID: id,
- data: { id, name: "second" },
- },
- ])
-
- const two = SyncEvent.replayAll([
- {
- id: "evt_3",
- type: SyncEvent.versionedType(Created.type, Created.version),
- seq: 2,
- aggregateID: id,
- data: { id, name: "third" },
- },
- {
- id: "evt_4",
- type: SyncEvent.versionedType(Created.type, Created.version),
- seq: 3,
- aggregateID: id,
- data: { id, name: "fourth" },
- },
- ])
+ const one = yield* SyncEvent.use.replayAll([
+ {
+ id: "evt_1",
+ type: SyncEvent.versionedType(Created.type, Created.version),
+ seq: 0,
+ aggregateID: id,
+ data: { id, name: "first" },
+ },
+ {
+ id: "evt_2",
+ type: SyncEvent.versionedType(Created.type, Created.version),
+ seq: 1,
+ aggregateID: id,
+ data: { id, name: "second" },
+ },
+ ])
+
+ const two = yield* SyncEvent.use.replayAll([
+ {
+ id: "evt_3",
+ type: SyncEvent.versionedType(Created.type, Created.version),
+ seq: 2,
+ aggregateID: id,
+ data: { id, name: "third" },
+ },
+ {
+ id: "evt_4",
+ type: SyncEvent.versionedType(Created.type, Created.version),
+ seq: 3,
+ aggregateID: id,
+ data: { id, name: "fourth" },
+ },
+ ])
- expect(one).toBe(id)
- expect(two).toBe(id)
+ expect(one).toBe(id)
+ expect(two).toBe(id)
- const rows = Database.use((db) => db.select().from(EventTable).all())
- expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
- }),
+ const rows = Database.use((db) => db.select().from(EventTable).all())
+ expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
+ }),
+ ),
)
})
})