summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-04-23 12:43:08 -0400
committerGitHub <[email protected]>2026-04-23 12:43:08 -0400
commitc50d65b4d6b956cad44663d9d31b7b5eb01c8e57 (patch)
tree3a4984f610445de82d3390d63827e4b9be835255
parent353532b1c16bd5677efd2c352f502178f0c5094c (diff)
downloadopencode-c50d65b4d6b956cad44663d9d31b7b5eb01c8e57.tar.gz
opencode-c50d65b4d6b956cad44663d9d31b7b5eb01c8e57.zip
refactor(sync): make session events schema-first (#24019)
-rw-r--r--packages/opencode/src/bus/index.ts24
-rw-r--r--packages/opencode/src/server/projectors.ts3
-rw-r--r--packages/opencode/src/session/message-v2.ts44
-rw-r--r--packages/opencode/src/session/projectors.ts2
-rw-r--r--packages/opencode/src/session/session.ts65
-rw-r--r--packages/opencode/src/share/share-next.ts2
-rw-r--r--packages/opencode/src/sync/index.ts44
-rw-r--r--packages/opencode/test/sync/index.test.ts6
-rw-r--r--packages/sdk/js/src/v2/gen/types.gen.ts32
9 files changed, 141 insertions, 81 deletions
diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts
index 8a9579b59..a2f9f5ccb 100644
--- a/packages/opencode/src/bus/index.ts
+++ b/packages/opencode/src/bus/index.ts
@@ -1,5 +1,5 @@
import z from "zod"
-import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
+import { Effect, Exit, Layer, PubSub, Scope, Context, Stream, Schema as EffectSchema, Types } from "effect"
import { EffectBridge } from "@/effect"
import { Log } from "../util"
import { BusEvent } from "./bus-event"
@@ -9,6 +9,12 @@ import { makeRuntime } from "@/effect/run-service"
const log = Log.create({ service: "bus" })
+type BusProperties<D extends BusEvent.Definition = BusEvent.Definition> = D extends {
+ effectProperties: infer Properties extends EffectSchema.Top
+}
+ ? Types.DeepMutable<EffectSchema.Schema.Type<Properties>>
+ : z.infer<D["properties"]>
+
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
z.object({
@@ -18,7 +24,7 @@ export const InstanceDisposed = BusEvent.define(
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
type: D["type"]
- properties: z.infer<D["properties"]>
+ properties: BusProperties<D>
}
type State = {
@@ -29,7 +35,7 @@ type State = {
export interface Interface {
readonly publish: <D extends BusEvent.Definition>(
def: D,
- properties: z.output<D["properties"]>,
+ properties: BusProperties<D>,
) => Effect.Effect<void>
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
readonly subscribeAll: () => Stream.Stream<Payload>
@@ -79,7 +85,10 @@ export const layer = Layer.effect(
})
}
- function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+ function publish<D extends BusEvent.Definition>(
+ def: D,
+ properties: BusProperties<D>,
+ ) {
return Effect.gen(function* () {
const s = yield* InstanceState.get(state)
const payload: Payload = { type: def.type, properties }
@@ -175,13 +184,16 @@ const { runPromise, runSync } = makeRuntime(Service, layer)
// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
-export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+export async function publish<D extends BusEvent.Definition>(
+ def: D,
+ properties: BusProperties<D>,
+) {
return runPromise((svc) => svc.publish(def, properties))
}
export function subscribe<D extends BusEvent.Definition>(
def: D,
- callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
+ callback: (event: Payload<D>) => unknown,
) {
return runSync((svc) => svc.subscribeCallback(def, callback))
}
diff --git a/packages/opencode/src/server/projectors.ts b/packages/opencode/src/server/projectors.ts
index cfecce526..18c273d58 100644
--- a/packages/opencode/src/server/projectors.ts
+++ b/packages/opencode/src/server/projectors.ts
@@ -1,4 +1,3 @@
-import z from "zod"
import sessionProjectors from "../session/projectors"
import { SyncEvent } from "@/sync"
import { Session } from "@/session"
@@ -10,7 +9,7 @@ export function initProjectors() {
projectors: sessionProjectors,
convertEvent: (type, data) => {
if (type === "session.updated") {
- const id = (data as z.infer<typeof Session.Event.Updated.schema>).sessionID
+ const id = (data as SyncEvent.Event<typeof Session.Event.Updated>["data"]).sessionID
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
if (!row) return data
diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts
index 980dd4da8..a4b25d95e 100644
--- a/packages/opencode/src/session/message-v2.ts
+++ b/packages/opencode/src/session/message-v2.ts
@@ -576,34 +576,46 @@ export const Info = Object.assign(_Info, {
})
export type Info = User | Assistant
+const UpdatedEventSchema = Schema.Struct({
+ sessionID: SessionID,
+ info: _Info,
+})
+
+const RemovedEventSchema = Schema.Struct({
+ sessionID: SessionID,
+ messageID: MessageID,
+})
+
+const PartUpdatedEventSchema = Schema.Struct({
+ sessionID: SessionID,
+ part: _Part,
+ time: Schema.Number,
+})
+
+const PartRemovedEventSchema = Schema.Struct({
+ sessionID: SessionID,
+ messageID: MessageID,
+ partID: PartID,
+})
+
export const Event = {
Updated: SyncEvent.define({
type: "message.updated",
version: 1,
aggregate: "sessionID",
- schema: z.object({
- sessionID: SessionID.zod,
- info: Info.zod,
- }),
+ schema: UpdatedEventSchema,
}),
Removed: SyncEvent.define({
type: "message.removed",
version: 1,
aggregate: "sessionID",
- schema: z.object({
- sessionID: SessionID.zod,
- messageID: MessageID.zod,
- }),
+ schema: RemovedEventSchema,
}),
PartUpdated: SyncEvent.define({
type: "message.part.updated",
version: 1,
aggregate: "sessionID",
- schema: z.object({
- sessionID: SessionID.zod,
- part: Part.zod,
- time: z.number(),
- }),
+ schema: PartUpdatedEventSchema,
}),
PartDelta: BusEvent.define(
"message.part.delta",
@@ -619,11 +631,7 @@ export const Event = {
type: "message.part.removed",
version: 1,
aggregate: "sessionID",
- schema: z.object({
- sessionID: SessionID.zod,
- messageID: MessageID.zod,
- partID: PartID.zod,
- }),
+ schema: PartRemovedEventSchema,
}),
}
diff --git a/packages/opencode/src/session/projectors.ts b/packages/opencode/src/session/projectors.ts
index 7b3b0a7f2..3a5fd0d8c 100644
--- a/packages/opencode/src/session/projectors.ts
+++ b/packages/opencode/src/session/projectors.ts
@@ -71,7 +71,7 @@ export default [
const info = data.info
const row = db
.update(SessionTable)
- .set(toPartialRow(info))
+ .set(toPartialRow(info as Session.Patch))
.where(eq(SessionTable.id, data.sessionID))
.returning()
.get()
diff --git a/packages/opencode/src/session/session.ts b/packages/opencode/src/session/session.ts
index d2bdbccb7..1e046fdf7 100644
--- a/packages/opencode/src/session/session.ts
+++ b/packages/opencode/src/session/session.ts
@@ -15,7 +15,6 @@ import { PartTable, SessionTable } from "./session.sql"
import { ProjectTable } from "../project/project.sql"
import { Storage } from "@/storage"
import { Log } from "../util"
-import { updateSchema } from "../util/update-schema"
import { MessageV2 } from "./message-v2"
import { Instance } from "../project/instance"
import { InstanceState } from "@/effect"
@@ -28,7 +27,7 @@ import type { Provider } from "@/provider"
import { Permission } from "@/permission"
import { Global } from "@/global"
import { Effect, Layer, Option, Context, Schema, Types } from "effect"
-import { zod, zodObject } from "@/util/effect-zod"
+import { zod } from "@/util/effect-zod"
import { withStatics } from "@/util/schema"
const log = Log.create({ service: "session" })
@@ -215,40 +214,62 @@ export const MessagesInput = Schema.Struct({
limit: Schema.optional(Schema.Number),
}).pipe(withStatics((s) => ({ zod: zod(s) })))
+const CreatedEventSchema = Schema.Struct({
+ sessionID: SessionID,
+ info: Info,
+})
+
+const UpdatedShare = Schema.Struct({
+ url: Schema.optional(Schema.NullOr(Schema.String)),
+})
+
+const UpdatedTime = Schema.Struct({
+ created: Schema.optional(Schema.NullOr(Schema.Number)),
+ updated: Schema.optional(Schema.NullOr(Schema.Number)),
+ compacting: Schema.optional(Schema.NullOr(Schema.Number)),
+ archived: Schema.optional(Schema.NullOr(Schema.Number)),
+})
+
+const UpdatedInfo = Schema.Struct({
+ id: Schema.optional(Schema.NullOr(SessionID)),
+ slug: Schema.optional(Schema.NullOr(Schema.String)),
+ projectID: Schema.optional(Schema.NullOr(ProjectID)),
+ workspaceID: Schema.optional(Schema.NullOr(WorkspaceID)),
+ directory: Schema.optional(Schema.NullOr(Schema.String)),
+ parentID: Schema.optional(Schema.NullOr(SessionID)),
+ summary: Schema.optional(Schema.NullOr(Summary)),
+ share: Schema.optional(UpdatedShare),
+ title: Schema.optional(Schema.NullOr(Schema.String)),
+ version: Schema.optional(Schema.NullOr(Schema.String)),
+ time: Schema.optional(UpdatedTime),
+ permission: Schema.optional(Schema.NullOr(Permission.Ruleset)),
+ revert: Schema.optional(Schema.NullOr(Revert)),
+})
+
+const UpdatedEventSchema = Schema.Struct({
+ sessionID: SessionID,
+ info: UpdatedInfo,
+})
+
export const Event = {
Created: SyncEvent.define({
type: "session.created",
version: 1,
aggregate: "sessionID",
- schema: z.object({
- sessionID: SessionID.zod,
- info: Info.zod,
- }),
+ schema: CreatedEventSchema,
}),
Updated: SyncEvent.define({
type: "session.updated",
version: 1,
aggregate: "sessionID",
- schema: z.object({
- sessionID: SessionID.zod,
- info: updateSchema(zodObject(Info)).extend({
- share: updateSchema(zodObject(Share)).optional(),
- time: updateSchema(zodObject(Time)).optional(),
- }),
- }),
- busSchema: z.object({
- sessionID: SessionID.zod,
- info: Info.zod,
- }),
+ schema: UpdatedEventSchema,
+ busSchema: CreatedEventSchema,
}),
Deleted: SyncEvent.define({
type: "session.deleted",
version: 1,
aggregate: "sessionID",
- schema: z.object({
- sessionID: SessionID.zod,
- info: Info.zod,
- }),
+ schema: CreatedEventSchema,
}),
Diff: BusEvent.define(
"session.diff",
@@ -394,7 +415,7 @@ export interface Interface {
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
-type Patch = z.infer<typeof Event.Updated.schema>["info"]
+export type Patch = Types.DeepMutable<SyncEvent.Event<typeof Event.Updated>["data"]["info"]>
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts
index 2622f4f7f..f26a085c2 100644
--- a/packages/opencode/src/share/share-next.ts
+++ b/packages/opencode/src/share/share-next.ts
@@ -181,7 +181,7 @@ export const layer = Layer.effect(
yield* watch(Session.Event.Updated, (evt) =>
Effect.gen(function* () {
- const info = yield* session.get(evt.properties.sessionID)
+ const info = evt.properties.info
yield* sync(info.id, [{ type: "session", data: info }])
}),
)
diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts
index 125d8c955..5a4078402 100644
--- a/packages/opencode/src/sync/index.ts
+++ b/packages/opencode/src/sync/index.ts
@@ -1,5 +1,4 @@
import z from "zod"
-import type { ZodObject } from "zod"
import { Database, eq } from "@/storage"
import { GlobalBus } from "@/bus/global"
import { Bus as ProjectBus } from "@/bus"
@@ -9,11 +8,16 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@/flag/flag"
+import { Schema as EffectSchema, Types } from "effect"
+import { zodObject } from "@/util/effect-zod"
+import { isRecord } from "@/util/record"
-export type Definition = {
+export type Definition<Schema extends EffectSchema.Top = EffectSchema.Top, BusSchema extends EffectSchema.Top = Schema> = {
type: string
version: number
aggregate: string
+ effectSchema: Schema
+ effectProperties: BusSchema
schema: z.ZodObject
// This is temporary and only exists for compatibility with bus
@@ -25,9 +29,13 @@ export type Event<Def extends Definition = Definition> = {
id: string
seq: number
aggregateID: string
- data: z.infer<Def["schema"]>
+ data: Types.DeepMutable<EffectSchema.Schema.Type<Def["effectSchema"]>>
}
+export type Properties<Def extends Definition = Definition> = Types.DeepMutable<
+ EffectSchema.Schema.Type<Def["effectProperties"]>
+>
+
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
@@ -36,7 +44,12 @@ export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
let frozen = false
-let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
+let convertEvent: (type: string, event: Event["data"]) => Promise<unknown> | unknown
+
+function asRecord(input: unknown) {
+ if (isRecord(input)) return input
+ throw new Error(`SyncEvent.convertEvent must return an object, got: ${JSON.stringify(input)}`)
+}
export function reset() {
frozen = false
@@ -54,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
- BusEvent.define(def.type, def.properties || def.schema)
+ BusEvent.define(def.type, def.properties)
}
// Freeze the system so it clearly errors if events are defined
@@ -72,19 +85,26 @@ export function versionedType(type: string, version?: number) {
export function define<
Type extends string,
Agg extends string,
- Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
- BusSchema extends ZodObject = Schema,
->(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
+ Schema extends EffectSchema.Top,
+ BusSchema extends EffectSchema.Top = Schema,
+>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }): Definition<
+ Schema,
+ BusSchema
+> {
if (frozen) {
throw new Error("Error defining sync event: sync system has been frozen")
}
+ const effectProperties = (input.busSchema ?? input.schema) as BusSchema
+
const def = {
type: input.type,
version: input.version,
aggregate: input.aggregate,
- schema: input.schema,
- properties: input.busSchema ? input.busSchema : input.schema,
+ effectSchema: input.schema,
+ effectProperties,
+ schema: zodObject(input.schema),
+ properties: zodObject(effectProperties),
}
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
@@ -143,10 +163,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
void result.then((data) => {
- void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
+ void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(data))
})
} else {
- void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
+ void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(result))
}
GlobalBus.emit("event", {
diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts
index 866bcaa31..d50f0d7c9 100644
--- a/packages/opencode/test/sync/index.test.ts
+++ b/packages/opencode/test/sync/index.test.ts
@@ -1,6 +1,6 @@
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { tmpdir } from "../fixture/fixture"
-import z from "zod"
+import { Schema } from "effect"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
@@ -43,13 +43,13 @@ describe("SyncEvent", () => {
type: "item.created",
version: 1,
aggregate: "id",
- schema: z.object({ id: z.string(), name: z.string() }),
+ schema: Schema.Struct({ id: Schema.String, name: Schema.String }),
})
const Sent = SyncEvent.define({
type: "item.sent",
version: 1,
aggregate: "item_id",
- schema: z.object({ item_id: z.string(), to: z.string() }),
+ schema: Schema.Struct({ item_id: Schema.String, to: Schema.String }),
})
SyncEvent.init({
diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts
index 1fcab2eda..d28ce2579 100644
--- a/packages/sdk/js/src/v2/gen/types.gen.ts
+++ b/packages/sdk/js/src/v2/gen/types.gen.ts
@@ -1058,31 +1058,31 @@ export type SyncEventSessionUpdated = {
data: {
sessionID: string
info: {
- id: string | null
- slug: string | null
- projectID: string | null
- workspaceID: string | null
- directory: string | null
- parentID: string | null
- summary: {
+ id?: string | null
+ slug?: string | null
+ projectID?: string | null
+ workspaceID?: string | null
+ directory?: string | null
+ parentID?: string | null
+ summary?: {
additions: number
deletions: number
files: number
diffs?: Array<SnapshotFileDiff>
} | null
share?: {
- url: string | null
+ url?: string | null
}
- title: string | null
- version: string | null
+ title?: string | null
+ version?: string | null
time?: {
- created: number | null
- updated: number | null
- compacting: number | null
- archived: number | null
+ created?: number | null
+ updated?: number | null
+ compacting?: number | null
+ archived?: number | null
}
- permission: PermissionRuleset | null
- revert: {
+ permission?: PermissionRuleset | null
+ revert?: {
messageID: string
partID?: string
snapshot?: string