summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-04-15 22:15:46 -0400
committerGitHub <[email protected]>2026-04-16 02:15:46 +0000
commita653a4b8871e5d58c56f588e4cd3b2001f8bc6a1 (patch)
tree7ee8b5150ccd3051abfa9fe04dc2421907809614 /packages
parentf7edffc11aeceeab1000026cfcd7063671a1e7bb (diff)
downloadopencode-a653a4b8871e5d58c56f588e4cd3b2001f8bc6a1.tar.gz
opencode-a653a4b8871e5d58c56f588e4cd3b2001f8bc6a1.zip
feat: unwrap usync namespace to flat exports + barrel (#22716)
Diffstat (limited to 'packages')
-rw-r--r--packages/opencode/src/sync/index.ts283
-rw-r--r--packages/opencode/src/sync/sync-event.ts280
2 files changed, 281 insertions, 282 deletions
diff --git a/packages/opencode/src/sync/index.ts b/packages/opencode/src/sync/index.ts
index e89d57e18..a6dec180b 100644
--- a/packages/opencode/src/sync/index.ts
+++ b/packages/opencode/src/sync/index.ts
@@ -1,282 +1 @@
-import z from "zod"
-import type { ZodObject } from "zod"
-import { Database, eq } from "@/storage/db"
-import { GlobalBus } from "@/bus/global"
-import { Bus as ProjectBus } from "@/bus"
-import { BusEvent } from "@/bus/bus-event"
-import { Instance } from "@/project/instance"
-import { EventSequenceTable, EventTable } from "./event.sql"
-import { WorkspaceContext } from "@/control-plane/workspace-context"
-import { EventID } from "./schema"
-import { Flag } from "@/flag/flag"
-
-export namespace SyncEvent {
- export type Definition = {
- type: string
- version: number
- aggregate: string
- schema: z.ZodObject
-
- // This is temporary and only exists for compatibility with bus
- // event definitions
- properties: z.ZodObject
- }
-
- export type Event<Def extends Definition = Definition> = {
- id: string
- seq: number
- aggregateID: string
- data: z.infer<Def["schema"]>
- }
-
- export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
-
- type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
-
- export const registry = new Map<string, Definition>()
- let projectors: Map<Definition, ProjectorFunc> | undefined
- const versions = new Map<string, number>()
- let frozen = false
- let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
-
- export function reset() {
- frozen = false
- projectors = undefined
- convertEvent = (_, data) => data
- }
-
- export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
- projectors = new Map(input.projectors)
-
- // Install all the latest event defs to the bus. We only ever emit
- // latest versions from code, and keep around old versions for
- // replaying. Replaying does not go through the bus, and it
- // simplifies the bus to only use unversioned latest events
- for (let [type, version] of versions.entries()) {
- let def = registry.get(versionedType(type, version))!
-
- BusEvent.define(def.type, def.properties || def.schema)
- }
-
- // Freeze the system so it clearly errors if events are defined
- // after `init` which would cause bugs
- frozen = true
- convertEvent = input.convertEvent || ((_, data) => data)
- }
-
- export function versionedType<A extends string>(type: A): A
- export function versionedType<A extends string, B extends number>(type: A, version: B): `${A}/${B}`
- export function versionedType(type: string, version?: number) {
- return version ? `${type}.${version}` : type
- }
-
- export function define<
- Type extends string,
- Agg extends string,
- Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
- BusSchema extends ZodObject = Schema,
- >(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
- if (frozen) {
- throw new Error("Error defining sync event: sync system has been frozen")
- }
-
- const def = {
- type: input.type,
- version: input.version,
- aggregate: input.aggregate,
- schema: input.schema,
- properties: input.busSchema ? input.busSchema : input.schema,
- }
-
- versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
-
- registry.set(versionedType(def.type, def.version), def)
-
- return def
- }
-
- export function project<Def extends Definition>(
- def: Def,
- func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,
- ): [Definition, ProjectorFunc] {
- return [def, func as ProjectorFunc]
- }
-
- function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
- if (projectors == null) {
- throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
- }
-
- const projector = projectors.get(def)
- if (!projector) {
- throw new Error(`Projector not found for event: ${def.type}`)
- }
-
- // idempotent: need to ignore any events already logged
-
- Database.transaction((tx) => {
- projector(tx, event.data)
-
- if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
- tx.insert(EventSequenceTable)
- .values({
- aggregate_id: event.aggregateID,
- seq: event.seq,
- })
- .onConflictDoUpdate({
- target: EventSequenceTable.aggregate_id,
- set: { seq: event.seq },
- })
- .run()
- tx.insert(EventTable)
- .values({
- id: event.id,
- seq: event.seq,
- aggregate_id: event.aggregateID,
- type: versionedType(def.type, def.version),
- data: event.data as Record<string, unknown>,
- })
- .run()
- }
-
- Database.effect(() => {
- if (options?.publish) {
- const result = convertEvent(def.type, event.data)
- if (result instanceof Promise) {
- result.then((data) => {
- ProjectBus.publish({ type: def.type, properties: def.schema }, data)
- })
- } else {
- ProjectBus.publish({ type: def.type, properties: def.schema }, result)
- }
-
- GlobalBus.emit("event", {
- directory: Instance.directory,
- project: Instance.project.id,
- workspace: WorkspaceContext.workspaceID,
- payload: {
- type: "sync",
- name: versionedType(def.type, def.version),
- ...event,
- },
- })
- }
- })
- })
- }
-
- // TODO:
- //
- // * Support applying multiple events at one time. One transaction,
- // and it validets all the sequence ids
- // * when loading events from db, apply zod validation to ensure shape
-
- export function replay(event: SerializedEvent, options?: { publish: boolean }) {
- const def = registry.get(event.type)
- if (!def) {
- throw new Error(`Unknown event type: ${event.type}`)
- }
-
- const row = Database.use((db) =>
- db
- .select({ seq: EventSequenceTable.seq })
- .from(EventSequenceTable)
- .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
- .get(),
- )
-
- const latest = row?.seq ?? -1
- if (event.seq <= latest) {
- return
- }
-
- const expected = latest + 1
- if (event.seq !== expected) {
- throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
- }
-
- process(def, event, { publish: !!options?.publish })
- }
-
- export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
- const source = events[0]?.aggregateID
- if (!source) return
- if (events.some((item) => item.aggregateID !== source)) {
- throw new Error("Replay events must belong to the same session")
- }
- const start = events[0].seq
- for (const [i, item] of events.entries()) {
- const seq = start + i
- if (item.seq !== seq) {
- throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
- }
- }
- for (const item of events) {
- replay(item, options)
- }
- return source
- }
-
- export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
- const agg = (data as Record<string, string>)[def.aggregate]
- // This should never happen: we've enforced it via typescript in
- // the definition
- if (agg == null) {
- throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
- }
-
- if (def.version !== versions.get(def.type)) {
- throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
- }
-
- const { publish = true } = options || {}
-
- // Note that this is an "immediate" transaction which is critical.
- // We need to make sure we can safely read and write with nothing
- // else changing the data from under us
- Database.transaction(
- (tx) => {
- const id = EventID.ascending()
- const row = tx
- .select({ seq: EventSequenceTable.seq })
- .from(EventSequenceTable)
- .where(eq(EventSequenceTable.aggregate_id, agg))
- .get()
- const seq = row?.seq != null ? row.seq + 1 : 0
-
- const event = { id, seq, aggregateID: agg, data }
- process(def, event, { publish })
- },
- {
- behavior: "immediate",
- },
- )
- }
-
- export function remove(aggregateID: string) {
- Database.transaction((tx) => {
- tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
- tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
- })
- }
-
- export function payloads() {
- return registry
- .entries()
- .map(([type, def]) => {
- return z
- .object({
- type: z.literal("sync"),
- name: z.literal(type),
- id: z.string(),
- seq: z.number(),
- aggregateID: z.literal(def.aggregate),
- data: def.schema,
- })
- .meta({
- ref: "SyncEvent" + "." + def.type,
- })
- })
- .toArray()
- }
-}
+export * as SyncEvent from "./sync-event"
diff --git a/packages/opencode/src/sync/sync-event.ts b/packages/opencode/src/sync/sync-event.ts
new file mode 100644
index 000000000..2b1eb0981
--- /dev/null
+++ b/packages/opencode/src/sync/sync-event.ts
@@ -0,0 +1,280 @@
+import z from "zod"
+import type { ZodObject } from "zod"
+import { Database, eq } from "@/storage/db"
+import { GlobalBus } from "@/bus/global"
+import { Bus as ProjectBus } from "@/bus"
+import { BusEvent } from "@/bus/bus-event"
+import { Instance } from "@/project/instance"
+import { EventSequenceTable, EventTable } from "./event.sql"
+import { WorkspaceContext } from "@/control-plane/workspace-context"
+import { EventID } from "./schema"
+import { Flag } from "@/flag/flag"
+
+export type Definition = {
+ type: string
+ version: number
+ aggregate: string
+ schema: z.ZodObject
+
+ // This is temporary and only exists for compatibility with bus
+ // event definitions
+ properties: z.ZodObject
+}
+
+export type Event<Def extends Definition = Definition> = {
+ id: string
+ seq: number
+ aggregateID: string
+ data: z.infer<Def["schema"]>
+}
+
+export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
+
+type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
+
+export const registry = new Map<string, Definition>()
+let projectors: Map<Definition, ProjectorFunc> | undefined
+const versions = new Map<string, number>()
+let frozen = false
+let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
+
+export function reset() {
+ frozen = false
+ projectors = undefined
+ convertEvent = (_, data) => data
+}
+
+export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
+ projectors = new Map(input.projectors)
+
+ // Install all the latest event defs to the bus. We only ever emit
+ // latest versions from code, and keep around old versions for
+ // replaying. Replaying does not go through the bus, and it
+ // simplifies the bus to only use unversioned latest events
+ for (let [type, version] of versions.entries()) {
+ let def = registry.get(versionedType(type, version))!
+
+ BusEvent.define(def.type, def.properties || def.schema)
+ }
+
+ // Freeze the system so it clearly errors if events are defined
+ // after `init` which would cause bugs
+ frozen = true
+ convertEvent = input.convertEvent || ((_, data) => data)
+}
+
+export function versionedType<A extends string>(type: A): A
+export function versionedType<A extends string, B extends number>(type: A, version: B): `${A}/${B}`
+export function versionedType(type: string, version?: number) {
+ return version ? `${type}.${version}` : type
+}
+
+export function define<
+ Type extends string,
+ Agg extends string,
+ Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
+ BusSchema extends ZodObject = Schema,
+>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
+ if (frozen) {
+ throw new Error("Error defining sync event: sync system has been frozen")
+ }
+
+ const def = {
+ type: input.type,
+ version: input.version,
+ aggregate: input.aggregate,
+ schema: input.schema,
+ properties: input.busSchema ? input.busSchema : input.schema,
+ }
+
+ versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
+
+ registry.set(versionedType(def.type, def.version), def)
+
+ return def
+}
+
+export function project<Def extends Definition>(
+ def: Def,
+ func: (db: Database.TxOrDb, data: Event<Def>["data"]) => void,
+): [Definition, ProjectorFunc] {
+ return [def, func as ProjectorFunc]
+}
+
+function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
+ if (projectors == null) {
+ throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
+ }
+
+ const projector = projectors.get(def)
+ if (!projector) {
+ throw new Error(`Projector not found for event: ${def.type}`)
+ }
+
+ // idempotent: need to ignore any events already logged
+
+ Database.transaction((tx) => {
+ projector(tx, event.data)
+
+ if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
+ tx.insert(EventSequenceTable)
+ .values({
+ aggregate_id: event.aggregateID,
+ seq: event.seq,
+ })
+ .onConflictDoUpdate({
+ target: EventSequenceTable.aggregate_id,
+ set: { seq: event.seq },
+ })
+ .run()
+ tx.insert(EventTable)
+ .values({
+ id: event.id,
+ seq: event.seq,
+ aggregate_id: event.aggregateID,
+ type: versionedType(def.type, def.version),
+ data: event.data as Record<string, unknown>,
+ })
+ .run()
+ }
+
+ Database.effect(() => {
+ if (options?.publish) {
+ const result = convertEvent(def.type, event.data)
+ if (result instanceof Promise) {
+ result.then((data) => {
+ ProjectBus.publish({ type: def.type, properties: def.schema }, data)
+ })
+ } else {
+ ProjectBus.publish({ type: def.type, properties: def.schema }, result)
+ }
+
+ GlobalBus.emit("event", {
+ directory: Instance.directory,
+ project: Instance.project.id,
+ workspace: WorkspaceContext.workspaceID,
+ payload: {
+ type: "sync",
+ name: versionedType(def.type, def.version),
+ ...event,
+ },
+ })
+ }
+ })
+ })
+}
+
+// TODO:
+//
+// * Support applying multiple events at one time. One transaction,
+// and it validets all the sequence ids
+// * when loading events from db, apply zod validation to ensure shape
+
+export function replay(event: SerializedEvent, options?: { publish: boolean }) {
+ const def = registry.get(event.type)
+ if (!def) {
+ throw new Error(`Unknown event type: ${event.type}`)
+ }
+
+ const row = Database.use((db) =>
+ db
+ .select({ seq: EventSequenceTable.seq })
+ .from(EventSequenceTable)
+ .where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
+ .get(),
+ )
+
+ const latest = row?.seq ?? -1
+ if (event.seq <= latest) {
+ return
+ }
+
+ const expected = latest + 1
+ if (event.seq !== expected) {
+ throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
+ }
+
+ process(def, event, { publish: !!options?.publish })
+}
+
+export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
+ const source = events[0]?.aggregateID
+ if (!source) return
+ if (events.some((item) => item.aggregateID !== source)) {
+ throw new Error("Replay events must belong to the same session")
+ }
+ const start = events[0].seq
+ for (const [i, item] of events.entries()) {
+ const seq = start + i
+ if (item.seq !== seq) {
+ throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
+ }
+ }
+ for (const item of events) {
+ replay(item, options)
+ }
+ return source
+}
+
+export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
+ const agg = (data as Record<string, string>)[def.aggregate]
+ // This should never happen: we've enforced it via typescript in
+ // the definition
+ if (agg == null) {
+ throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
+ }
+
+ if (def.version !== versions.get(def.type)) {
+ throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
+ }
+
+ const { publish = true } = options || {}
+
+ // Note that this is an "immediate" transaction which is critical.
+ // We need to make sure we can safely read and write with nothing
+ // else changing the data from under us
+ Database.transaction(
+ (tx) => {
+ const id = EventID.ascending()
+ const row = tx
+ .select({ seq: EventSequenceTable.seq })
+ .from(EventSequenceTable)
+ .where(eq(EventSequenceTable.aggregate_id, agg))
+ .get()
+ const seq = row?.seq != null ? row.seq + 1 : 0
+
+ const event = { id, seq, aggregateID: agg, data }
+ process(def, event, { publish })
+ },
+ {
+ behavior: "immediate",
+ },
+ )
+}
+
+export function remove(aggregateID: string) {
+ Database.transaction((tx) => {
+ tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
+ tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
+ })
+}
+
+export function payloads() {
+ return registry
+ .entries()
+ .map(([type, def]) => {
+ return z
+ .object({
+ type: z.literal("sync"),
+ name: z.literal(type),
+ id: z.string(),
+ seq: z.number(),
+ aggregateID: z.literal(def.aggregate),
+ data: def.schema,
+ })
+ .meta({
+ ref: "SyncEvent" + "." + def.type,
+ })
+ })
+ .toArray()
+}