summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--packages/opencode/src/bus/bus.ts191
-rw-r--r--packages/opencode/src/bus/index.ts194
2 files changed, 193 insertions, 192 deletions
diff --git a/packages/opencode/src/bus/bus.ts b/packages/opencode/src/bus/bus.ts
deleted file mode 100644
index beac80992..000000000
--- a/packages/opencode/src/bus/bus.ts
+++ /dev/null
@@ -1,191 +0,0 @@
-import z from "zod"
-import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
-import { EffectBridge } from "@/effect"
-import { Log } from "../util"
-import { BusEvent } from "./bus-event"
-import { GlobalBus } from "./global"
-import { InstanceState } from "@/effect"
-import { makeRuntime } from "@/effect/run-service"
-
-const log = Log.create({ service: "bus" })
-
-export const InstanceDisposed = BusEvent.define(
- "server.instance.disposed",
- z.object({
- directory: z.string(),
- }),
-)
-
-type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
- type: D["type"]
- properties: z.infer<D["properties"]>
-}
-
-type State = {
- wildcard: PubSub.PubSub<Payload>
- typed: Map<string, PubSub.PubSub<Payload>>
-}
-
-export interface Interface {
- readonly publish: <D extends BusEvent.Definition>(
- def: D,
- properties: z.output<D["properties"]>,
- ) => Effect.Effect<void>
- readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
- readonly subscribeAll: () => Stream.Stream<Payload>
- readonly subscribeCallback: <D extends BusEvent.Definition>(
- def: D,
- callback: (event: Payload<D>) => unknown,
- ) => Effect.Effect<() => void>
- readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
-}
-
-export class Service extends Context.Service<Service, Interface>()("@opencode/Bus") {}
-
-export const layer = Layer.effect(
- Service,
- Effect.gen(function* () {
- const state = yield* InstanceState.make<State>(
- Effect.fn("Bus.state")(function* (ctx) {
- const wildcard = yield* PubSub.unbounded<Payload>()
- const typed = new Map<string, PubSub.PubSub<Payload>>()
-
- yield* Effect.addFinalizer(() =>
- Effect.gen(function* () {
- // Publish InstanceDisposed before shutting down so subscribers see it
- yield* PubSub.publish(wildcard, {
- type: InstanceDisposed.type,
- properties: { directory: ctx.directory },
- })
- yield* PubSub.shutdown(wildcard)
- for (const ps of typed.values()) {
- yield* PubSub.shutdown(ps)
- }
- }),
- )
-
- return { wildcard, typed }
- }),
- )
-
- function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
- return Effect.gen(function* () {
- let ps = state.typed.get(def.type)
- if (!ps) {
- ps = yield* PubSub.unbounded<Payload>()
- state.typed.set(def.type, ps)
- }
- return ps as unknown as PubSub.PubSub<Payload<D>>
- })
- }
-
- function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
- return Effect.gen(function* () {
- const s = yield* InstanceState.get(state)
- const payload: Payload = { type: def.type, properties }
- log.info("publishing", { type: def.type })
-
- const ps = s.typed.get(def.type)
- if (ps) yield* PubSub.publish(ps, payload)
- yield* PubSub.publish(s.wildcard, payload)
-
- const dir = yield* InstanceState.directory
- const context = yield* InstanceState.context
- const workspace = yield* InstanceState.workspaceID
-
- GlobalBus.emit("event", {
- directory: dir,
- project: context.project.id,
- workspace,
- payload,
- })
- })
- }
-
- function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
- log.info("subscribing", { type: def.type })
- return Stream.unwrap(
- Effect.gen(function* () {
- const s = yield* InstanceState.get(state)
- const ps = yield* getOrCreate(s, def)
- return Stream.fromPubSub(ps)
- }),
- ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
- }
-
- function subscribeAll(): Stream.Stream<Payload> {
- log.info("subscribing", { type: "*" })
- return Stream.unwrap(
- Effect.gen(function* () {
- const s = yield* InstanceState.get(state)
- return Stream.fromPubSub(s.wildcard)
- }),
- ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
- }
-
- function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
- return Effect.gen(function* () {
- log.info("subscribing", { type })
- const bridge = yield* EffectBridge.make()
- const scope = yield* Scope.make()
- const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
-
- yield* Scope.provide(scope)(
- Stream.fromSubscription(subscription).pipe(
- Stream.runForEach((msg) =>
- Effect.tryPromise({
- try: () => Promise.resolve().then(() => callback(msg)),
- catch: (cause) => {
- log.error("subscriber failed", { type, cause })
- },
- }).pipe(Effect.ignore),
- ),
- Effect.forkScoped,
- ),
- )
-
- return () => {
- log.info("unsubscribing", { type })
- bridge.fork(Scope.close(scope, Exit.void))
- }
- })
- }
-
- const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
- def: D,
- callback: (event: Payload<D>) => unknown,
- ) {
- const s = yield* InstanceState.get(state)
- const ps = yield* getOrCreate(s, def)
- return yield* on(ps, def.type, callback)
- })
-
- const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
- const s = yield* InstanceState.get(state)
- return yield* on(s.wildcard, "*", callback)
- })
-
- return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
- }),
-)
-
-export const defaultLayer = layer
-
-const { runPromise, runSync } = makeRuntime(Service, layer)
-
-// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
-// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
-export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
- return runPromise((svc) => svc.publish(def, properties))
-}
-
-export function subscribe<D extends BusEvent.Definition>(
- def: D,
- callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
-) {
- return runSync((svc) => svc.subscribeCallback(def, callback))
-}
-
-export function subscribeAll(callback: (event: any) => unknown) {
- return runSync((svc) => svc.subscribeAllCallback(callback))
-}
diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts
index 3c21d7c7d..8a9579b59 100644
--- a/packages/opencode/src/bus/index.ts
+++ b/packages/opencode/src/bus/index.ts
@@ -1 +1,193 @@
-export * as Bus from "./bus"
+import z from "zod"
+import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
+import { EffectBridge } from "@/effect"
+import { Log } from "../util"
+import { BusEvent } from "./bus-event"
+import { GlobalBus } from "./global"
+import { InstanceState } from "@/effect"
+import { makeRuntime } from "@/effect/run-service"
+
+const log = Log.create({ service: "bus" })
+
+export const InstanceDisposed = BusEvent.define(
+ "server.instance.disposed",
+ z.object({
+ directory: z.string(),
+ }),
+)
+
+type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
+ type: D["type"]
+ properties: z.infer<D["properties"]>
+}
+
+type State = {
+ wildcard: PubSub.PubSub<Payload>
+ typed: Map<string, PubSub.PubSub<Payload>>
+}
+
+export interface Interface {
+ readonly publish: <D extends BusEvent.Definition>(
+ def: D,
+ properties: z.output<D["properties"]>,
+ ) => Effect.Effect<void>
+ readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
+ readonly subscribeAll: () => Stream.Stream<Payload>
+ readonly subscribeCallback: <D extends BusEvent.Definition>(
+ def: D,
+ callback: (event: Payload<D>) => unknown,
+ ) => Effect.Effect<() => void>
+ readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
+}
+
+export class Service extends Context.Service<Service, Interface>()("@opencode/Bus") {}
+
+export const layer = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const state = yield* InstanceState.make<State>(
+ Effect.fn("Bus.state")(function* (ctx) {
+ const wildcard = yield* PubSub.unbounded<Payload>()
+ const typed = new Map<string, PubSub.PubSub<Payload>>()
+
+ yield* Effect.addFinalizer(() =>
+ Effect.gen(function* () {
+ // Publish InstanceDisposed before shutting down so subscribers see it
+ yield* PubSub.publish(wildcard, {
+ type: InstanceDisposed.type,
+ properties: { directory: ctx.directory },
+ })
+ yield* PubSub.shutdown(wildcard)
+ for (const ps of typed.values()) {
+ yield* PubSub.shutdown(ps)
+ }
+ }),
+ )
+
+ return { wildcard, typed }
+ }),
+ )
+
+ function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
+ return Effect.gen(function* () {
+ let ps = state.typed.get(def.type)
+ if (!ps) {
+ ps = yield* PubSub.unbounded<Payload>()
+ state.typed.set(def.type, ps)
+ }
+ return ps as unknown as PubSub.PubSub<Payload<D>>
+ })
+ }
+
+ function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+ return Effect.gen(function* () {
+ const s = yield* InstanceState.get(state)
+ const payload: Payload = { type: def.type, properties }
+ log.info("publishing", { type: def.type })
+
+ const ps = s.typed.get(def.type)
+ if (ps) yield* PubSub.publish(ps, payload)
+ yield* PubSub.publish(s.wildcard, payload)
+
+ const dir = yield* InstanceState.directory
+ const context = yield* InstanceState.context
+ const workspace = yield* InstanceState.workspaceID
+
+ GlobalBus.emit("event", {
+ directory: dir,
+ project: context.project.id,
+ workspace,
+ payload,
+ })
+ })
+ }
+
+ function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
+ log.info("subscribing", { type: def.type })
+ return Stream.unwrap(
+ Effect.gen(function* () {
+ const s = yield* InstanceState.get(state)
+ const ps = yield* getOrCreate(s, def)
+ return Stream.fromPubSub(ps)
+ }),
+ ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
+ }
+
+ function subscribeAll(): Stream.Stream<Payload> {
+ log.info("subscribing", { type: "*" })
+ return Stream.unwrap(
+ Effect.gen(function* () {
+ const s = yield* InstanceState.get(state)
+ return Stream.fromPubSub(s.wildcard)
+ }),
+ ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
+ }
+
+ function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
+ return Effect.gen(function* () {
+ log.info("subscribing", { type })
+ const bridge = yield* EffectBridge.make()
+ const scope = yield* Scope.make()
+ const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
+
+ yield* Scope.provide(scope)(
+ Stream.fromSubscription(subscription).pipe(
+ Stream.runForEach((msg) =>
+ Effect.tryPromise({
+ try: () => Promise.resolve().then(() => callback(msg)),
+ catch: (cause) => {
+ log.error("subscriber failed", { type, cause })
+ },
+ }).pipe(Effect.ignore),
+ ),
+ Effect.forkScoped,
+ ),
+ )
+
+ return () => {
+ log.info("unsubscribing", { type })
+ bridge.fork(Scope.close(scope, Exit.void))
+ }
+ })
+ }
+
+ const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
+ def: D,
+ callback: (event: Payload<D>) => unknown,
+ ) {
+ const s = yield* InstanceState.get(state)
+ const ps = yield* getOrCreate(s, def)
+ return yield* on(ps, def.type, callback)
+ })
+
+ const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
+ const s = yield* InstanceState.get(state)
+ return yield* on(s.wildcard, "*", callback)
+ })
+
+ return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
+ }),
+)
+
+export const defaultLayer = layer
+
+const { runPromise, runSync } = makeRuntime(Service, layer)
+
+// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
+// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
+export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+ return runPromise((svc) => svc.publish(def, properties))
+}
+
+export function subscribe<D extends BusEvent.Definition>(
+ def: D,
+ callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
+) {
+ return runSync((svc) => svc.subscribeCallback(def, callback))
+}
+
+export function subscribeAll(callback: (event: any) => unknown) {
+ return runSync((svc) => svc.subscribeAllCallback(callback))
+}
+
+export * as Bus from "."