summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-03-25 20:19:24 -0400
committerGitHub <[email protected]>2026-03-25 20:19:24 -0400
commit97c15a087d34f40f4cc09c5c347fbc49b7c7af38 (patch)
treef8ec29b63c7c5dfa37ab44ddc4f39a2367c3cc3c
parentb90de755f9b9aa334077f771c67ad7c454896925 (diff)
downloadopencode-97c15a087d34f40f4cc09c5c347fbc49b7c7af38.tar.gz
opencode-97c15a087d34f40f4cc09c5c347fbc49b7c7af38.zip
effectify Bus service: migrate to Effect PubSub + InstanceState (#18579)
-rw-r--r--packages/opencode/AGENTS.md14
-rw-r--r--packages/opencode/specs/effect-migration.md49
-rw-r--r--packages/opencode/src/account/index.ts4
-rw-r--r--packages/opencode/src/agent/agent.ts4
-rw-r--r--packages/opencode/src/auth/index.ts4
-rw-r--r--packages/opencode/src/bus/index.ts229
-rw-r--r--packages/opencode/src/command/index.ts4
-rw-r--r--packages/opencode/src/effect/instance-context.ts14
-rw-r--r--packages/opencode/src/effect/run-service.ts12
-rw-r--r--packages/opencode/src/file/index.ts4
-rw-r--r--packages/opencode/src/file/time.ts4
-rw-r--r--packages/opencode/src/file/watcher.ts4
-rw-r--r--packages/opencode/src/format/index.ts95
-rw-r--r--packages/opencode/src/git/index.ts4
-rw-r--r--packages/opencode/src/installation/index.ts4
-rw-r--r--packages/opencode/src/permission/index.ts4
-rw-r--r--packages/opencode/src/plugin/index.ts33
-rw-r--r--packages/opencode/src/project/project.ts4
-rw-r--r--packages/opencode/src/project/vcs.ts43
-rw-r--r--packages/opencode/src/provider/auth.ts4
-rw-r--r--packages/opencode/src/pty/index.ts4
-rw-r--r--packages/opencode/src/question/index.ts4
-rw-r--r--packages/opencode/src/session/status.ts11
-rw-r--r--packages/opencode/src/skill/index.ts4
-rw-r--r--packages/opencode/src/snapshot/index.ts4
-rw-r--r--packages/opencode/src/tool/apply_patch.ts6
-rw-r--r--packages/opencode/src/tool/edit.ts11
-rw-r--r--packages/opencode/src/tool/registry.ts4
-rw-r--r--packages/opencode/src/tool/truncate.ts4
-rw-r--r--packages/opencode/src/tool/write.ts6
-rw-r--r--packages/opencode/src/worktree/index.ts4
-rw-r--r--packages/opencode/test/bus/bus-effect.test.ts164
-rw-r--r--packages/opencode/test/bus/bus-integration.test.ts87
-rw-r--r--packages/opencode/test/bus/bus.test.ts219
-rw-r--r--packages/opencode/test/effect/run-service.test.ts8
-rw-r--r--packages/opencode/test/file/watcher.test.ts32
-rw-r--r--packages/opencode/test/fixture/fixture.ts68
-rw-r--r--packages/opencode/test/fixture/instance.ts51
-rw-r--r--packages/opencode/test/format/format.test.ts332
-rw-r--r--packages/opencode/test/project/vcs.test.ts72
-rw-r--r--packages/opencode/test/sync/index.test.ts10
-rw-r--r--packages/opencode/test/tool/edit.test.ts7
42 files changed, 1120 insertions, 529 deletions
diff --git a/packages/opencode/AGENTS.md b/packages/opencode/AGENTS.md
index e2a0c918d..3e4c309ce 100644
--- a/packages/opencode/AGENTS.md
+++ b/packages/opencode/AGENTS.md
@@ -31,12 +31,14 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
- Use `Schema.Defect` instead of `unknown` for defect-like causes.
- In `Effect.gen` / `Effect.fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
-## Runtime vs Instances
+## Runtime vs InstanceState
-- Use the shared runtime for process-wide services with one lifecycle for the whole app.
-- Use `src/effect/instances.ts` for per-directory or per-project services that need `InstanceContext`, per-instance state, or per-instance cleanup.
-- If two open directories should not share one copy of the service, it belongs in `Instances`.
-- Instance-scoped services should read context from `InstanceContext`, not `Instance.*` globals.
+- Use `makeRuntime` (from `src/effect/run-service.ts`) for all services. It returns `{ runPromise, runFork, runCallback }` backed by a shared `memoMap` that deduplicates layers.
+- Use `InstanceState` (from `src/effect/instance-state.ts`) for per-directory or per-project state that needs per-instance cleanup. It uses `ScopedCache` keyed by directory — each open project gets its own state, automatically cleaned up on disposal.
+- If two open directories should not share one copy of the service, it needs `InstanceState`.
+- Do the work directly in the `InstanceState.make` closure — `ScopedCache` handles run-once semantics. Don't add fibers, `ensure()` callbacks, or `started` flags on top.
+- Use `Effect.addFinalizer` or `Effect.acquireRelease` inside the `InstanceState.make` closure for cleanup (subscriptions, process teardown, etc.).
+- Use `Effect.forkScoped` inside the closure for background stream consumers — the fiber is interrupted when the instance is disposed.
## Preferred Effect services
@@ -51,7 +53,7 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
`Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and restores it synchronously when called.
-Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
+Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish` or anything that reads `Instance.directory`.
You do not need it for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers.
diff --git a/packages/opencode/specs/effect-migration.md b/packages/opencode/specs/effect-migration.md
index d98750eac..c95d131dc 100644
--- a/packages/opencode/specs/effect-migration.md
+++ b/packages/opencode/specs/effect-migration.md
@@ -6,7 +6,7 @@ Practical reference for new and migrated Effect code in `packages/opencode`.
Use `InstanceState` (from `src/effect/instance-state.ts`) for services that need per-directory state, per-instance cleanup, or project-bound background work. InstanceState uses a `ScopedCache` keyed by directory, so each open project gets its own copy of the state that is automatically cleaned up on disposal.
-Use `makeRunPromise` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`.
+Use `makeRuntime` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`. Returns `{ runPromise, runFork, runCallback }`.
- Global services (no per-directory state): Account, Auth, Installation, Truncate
- Instance-scoped (per-directory state via InstanceState): File, FileTime, FileWatcher, Format, Permission, Question, Skill, Snapshot, Vcs, ProviderAuth
@@ -46,7 +46,7 @@ export namespace Foo {
export const defaultLayer = layer.pipe(Layer.provide(FooDep.layer))
// Per-service runtime (inside the namespace)
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
// Async facade functions
export async function get(id: FooID) {
@@ -79,29 +79,34 @@ See `Auth.ZodInfo` for the canonical example.
The `InstanceState.make` init callback receives a `Scope`, so you can use `Effect.acquireRelease`, `Effect.addFinalizer`, and `Effect.forkScoped` inside it. Resources acquired this way are automatically cleaned up when the instance is disposed or invalidated by `ScopedCache`. This makes it the right place for:
-- **Subscriptions**: Use `Effect.acquireRelease` to subscribe and auto-unsubscribe:
+- **Subscriptions**: Yield `Bus.Service` at the layer level, then use `Stream` + `forkScoped` inside the init closure. The fiber is automatically interrupted when the instance scope closes:
```ts
-const cache =
- yield *
- InstanceState.make<State>(
- Effect.fn("Foo.state")(function* (ctx) {
- // ... load state ...
-
- yield* Effect.acquireRelease(
- Effect.sync(() =>
- Bus.subscribeAll((event) => {
- /* handle */
- }),
- ),
- (unsub) => Effect.sync(unsub),
+const bus = yield* Bus.Service
+
+const cache = yield* InstanceState.make<State>(
+ Effect.fn("Foo.state")(function* (ctx) {
+ // ... load state ...
+
+ yield* bus
+ .subscribeAll()
+ .pipe(
+ Stream.runForEach((event) => Effect.sync(() => { /* handle */ })),
+ Effect.forkScoped,
)
- return {
- /* state */
- }
- }),
- )
+ return { /* state */ }
+ }),
+)
+```
+
+- **Resource cleanup**: Use `Effect.acquireRelease` or `Effect.addFinalizer` for resources that need teardown (native watchers, process handles, etc.):
+
+```ts
+yield* Effect.acquireRelease(
+ Effect.sync(() => nativeAddon.watch(dir)),
+ (watcher) => Effect.sync(() => watcher.close()),
+)
```
- **Background fibers**: Use `Effect.forkScoped` — the fiber is interrupted on disposal.
@@ -165,7 +170,7 @@ Still open and likely worth migrating:
- [x] `ToolRegistry`
- [ ] `Pty`
- [x] `Worktree`
-- [ ] `Bus`
+- [x] `Bus`
- [x] `Command`
- [ ] `Config`
- [ ] `Session`
diff --git a/packages/opencode/src/account/index.ts b/packages/opencode/src/account/index.ts
index 0a8d3687a..82b166ef2 100644
--- a/packages/opencode/src/account/index.ts
+++ b/packages/opencode/src/account/index.ts
@@ -1,7 +1,7 @@
import { Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { withTransientReadRetry } from "@/util/effect-http-client"
import { AccountRepo, type AccountRow } from "./repo"
import {
@@ -379,7 +379,7 @@ export namespace Account {
export const defaultLayer = layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(FetchHttpClient.layer))
- export const runPromise = makeRunPromise(Service, defaultLayer)
+ export const { runPromise } = makeRuntime(Service, defaultLayer)
export async function active(): Promise<Info | undefined> {
return Option.getOrUndefined(await runPromise((service) => service.active()))
diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts
index 2ae18aaae..622537e3c 100644
--- a/packages/opencode/src/agent/agent.ts
+++ b/packages/opencode/src/agent/agent.ts
@@ -21,7 +21,7 @@ import { Plugin } from "@/plugin"
import { Skill } from "../skill"
import { Effect, ServiceMap, Layer } from "effect"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
export namespace Agent {
export const Info = z
@@ -393,7 +393,7 @@ export namespace Agent {
export const defaultLayer = layer.pipe(Layer.provide(Auth.layer))
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(agent: string) {
return runPromise((svc) => svc.get(agent))
diff --git a/packages/opencode/src/auth/index.ts b/packages/opencode/src/auth/index.ts
index 2238d57f5..2ccc1edff 100644
--- a/packages/opencode/src/auth/index.ts
+++ b/packages/opencode/src/auth/index.ts
@@ -1,6 +1,6 @@
import path from "path"
import { Effect, Layer, Record, Result, Schema, ServiceMap } from "effect"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { zod } from "@/util/effect-zod"
import { Global } from "../global"
import { Filesystem } from "../util/filesystem"
@@ -95,7 +95,7 @@ export namespace Auth {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export async function get(providerID: string) {
return runPromise((service) => service.get(providerID))
diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts
index 625f29662..db6327c82 100644
--- a/packages/opencode/src/bus/index.ts
+++ b/packages/opencode/src/bus/index.ts
@@ -1,12 +1,14 @@
import z from "zod"
+import { Effect, Exit, Layer, PubSub, Scope, ServiceMap, Stream } from "effect"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { BusEvent } from "./bus-event"
import { GlobalBus } from "./global"
+import { InstanceState } from "@/effect/instance-state"
+import { makeRuntime } from "@/effect/run-service"
export namespace Bus {
const log = Log.create({ service: "bus" })
- type Subscription = (event: any) => void
export const InstanceDisposed = BusEvent.define(
"server.instance.disposed",
@@ -15,91 +17,168 @@ export namespace Bus {
}),
)
- const state = Instance.state(
- () => {
- const subscriptions = new Map<any, Subscription[]>()
+ type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
+ type: D["type"]
+ properties: z.infer<D["properties"]>
+ }
+
+ type State = {
+ wildcard: PubSub.PubSub<Payload>
+ typed: Map<string, PubSub.PubSub<Payload>>
+ }
+
+ export interface Interface {
+ readonly publish: <D extends BusEvent.Definition>(
+ def: D,
+ properties: z.output<D["properties"]>,
+ ) => Effect.Effect<void>
+ readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
+ readonly subscribeAll: () => Stream.Stream<Payload>
+ readonly subscribeCallback: <D extends BusEvent.Definition>(
+ def: D,
+ callback: (event: Payload<D>) => unknown,
+ ) => Effect.Effect<() => void>
+ readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
+ }
+
+ export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
+
+ export const layer = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const cache = yield* InstanceState.make<State>(
+ Effect.fn("Bus.state")(function* (ctx) {
+ const wildcard = yield* PubSub.unbounded<Payload>()
+ const typed = new Map<string, PubSub.PubSub<Payload>>()
+
+ yield* Effect.addFinalizer(() =>
+ Effect.gen(function* () {
+ // Publish InstanceDisposed before shutting down so subscribers see it
+ yield* PubSub.publish(wildcard, {
+ type: InstanceDisposed.type,
+ properties: { directory: ctx.directory },
+ })
+ yield* PubSub.shutdown(wildcard)
+ for (const ps of typed.values()) {
+ yield* PubSub.shutdown(ps)
+ }
+ }),
+ )
- return {
- subscriptions,
+ return { wildcard, typed }
+ }),
+ )
+
+ function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
+ return Effect.gen(function* () {
+ let ps = state.typed.get(def.type)
+ if (!ps) {
+ ps = yield* PubSub.unbounded<Payload>()
+ state.typed.set(def.type, ps)
+ }
+ return ps as unknown as PubSub.PubSub<Payload<D>>
+ })
}
- },
- async (entry) => {
- const wildcard = entry.subscriptions.get("*")
- if (!wildcard) return
- const event = {
- type: InstanceDisposed.type,
- properties: {
- directory: Instance.directory,
- },
+
+ function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+ return Effect.gen(function* () {
+ const state = yield* InstanceState.get(cache)
+ const payload: Payload = { type: def.type, properties }
+ log.info("publishing", { type: def.type })
+
+ const ps = state.typed.get(def.type)
+ if (ps) yield* PubSub.publish(ps, payload)
+ yield* PubSub.publish(state.wildcard, payload)
+
+ GlobalBus.emit("event", {
+ directory: Instance.directory,
+ payload,
+ })
+ })
}
- for (const sub of [...wildcard]) {
- sub(event)
+
+ function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
+ log.info("subscribing", { type: def.type })
+ return Stream.unwrap(
+ Effect.gen(function* () {
+ const state = yield* InstanceState.get(cache)
+ const ps = yield* getOrCreate(state, def)
+ return Stream.fromPubSub(ps)
+ }),
+ ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
}
- },
- )
- export async function publish<Definition extends BusEvent.Definition>(
- def: Definition,
- properties: z.output<Definition["properties"]>,
- ) {
- const payload = {
- type: def.type,
- properties,
- }
- log.info("publishing", {
- type: def.type,
- })
- const pending = []
- for (const key of [def.type, "*"]) {
- const match = [...(state().subscriptions.get(key) ?? [])]
- for (const sub of match) {
- pending.push(sub(payload))
+ function subscribeAll(): Stream.Stream<Payload> {
+ log.info("subscribing", { type: "*" })
+ return Stream.unwrap(
+ Effect.gen(function* () {
+ const state = yield* InstanceState.get(cache)
+ return Stream.fromPubSub(state.wildcard)
+ }),
+ ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
}
- }
- GlobalBus.emit("event", {
- directory: Instance.directory,
- payload,
- })
- return Promise.all(pending)
- }
- export function subscribe<Definition extends BusEvent.Definition>(
- def: Definition,
- callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
- ) {
- return raw(def.type, callback)
- }
+ function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
+ return Effect.gen(function* () {
+ log.info("subscribing", { type })
+ const scope = yield* Scope.make()
+ const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
- export function once<Definition extends BusEvent.Definition>(
- def: Definition,
- callback: (event: {
- type: Definition["type"]
- properties: z.infer<Definition["properties"]>
- }) => "done" | undefined,
- ) {
- const unsub = subscribe(def, (event) => {
- if (callback(event)) unsub()
- })
+ yield* Scope.provide(scope)(
+ Stream.fromSubscription(subscription).pipe(
+ Stream.runForEach((msg) =>
+ Effect.tryPromise({
+ try: () => Promise.resolve().then(() => callback(msg)),
+ catch: (cause) => {
+ log.error("subscriber failed", { type, cause })
+ },
+ }).pipe(Effect.ignore),
+ ),
+ Effect.forkScoped,
+ ),
+ )
+
+ return () => {
+ log.info("unsubscribing", { type })
+ Effect.runFork(Scope.close(scope, Exit.void))
+ }
+ })
+ }
+
+ const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
+ def: D,
+ callback: (event: Payload<D>) => unknown,
+ ) {
+ const state = yield* InstanceState.get(cache)
+ const ps = yield* getOrCreate(state, def)
+ return yield* on(ps, def.type, callback)
+ })
+
+ const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
+ const state = yield* InstanceState.get(cache)
+ return yield* on(state.wildcard, "*", callback)
+ })
+
+ return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
+ }),
+ )
+
+ const { runPromise, runSync } = makeRuntime(Service, layer)
+
+ // runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
+ // Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
+ export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+ return runPromise((svc) => svc.publish(def, properties))
}
- export function subscribeAll(callback: (event: any) => void) {
- return raw("*", callback)
+ export function subscribe<D extends BusEvent.Definition>(
+ def: D,
+ callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
+ ) {
+ return runSync((svc) => svc.subscribeCallback(def, callback))
}
- function raw(type: string, callback: (event: any) => void) {
- log.info("subscribing", { type })
- const subscriptions = state().subscriptions
- let match = subscriptions.get(type) ?? []
- match.push(callback)
- subscriptions.set(type, match)
-
- return () => {
- log.info("unsubscribing", { type })
- const match = subscriptions.get(type)
- if (!match) return
- const index = match.indexOf(callback)
- if (index === -1) return
- match.splice(index, 1)
- }
+ export function subscribeAll(callback: (event: any) => unknown) {
+ return runSync((svc) => svc.subscribeAllCallback(callback))
}
}
diff --git a/packages/opencode/src/command/index.ts b/packages/opencode/src/command/index.ts
index ff9382610..a2407982a 100644
--- a/packages/opencode/src/command/index.ts
+++ b/packages/opencode/src/command/index.ts
@@ -1,6 +1,6 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { SessionID, MessageID } from "@/session/schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
@@ -173,7 +173,7 @@ export namespace Command {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export async function get(name: string) {
return runPromise((svc) => svc.get(name))
diff --git a/packages/opencode/src/effect/instance-context.ts b/packages/opencode/src/effect/instance-context.ts
deleted file mode 100644
index fd4590190..000000000
--- a/packages/opencode/src/effect/instance-context.ts
+++ /dev/null
@@ -1,14 +0,0 @@
-import { ServiceMap } from "effect"
-import type { Project } from "@/project/project"
-
-export declare namespace InstanceContext {
- export interface Shape {
- readonly directory: string
- readonly worktree: string
- readonly project: Project.Info
- }
-}
-
-export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
- "opencode/InstanceContext",
-) {}
diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts
index 226c276ea..76248ca88 100644
--- a/packages/opencode/src/effect/run-service.ts
+++ b/packages/opencode/src/effect/run-service.ts
@@ -3,11 +3,15 @@ import * as ServiceMap from "effect/ServiceMap"
export const memoMap = Layer.makeMemoMapUnsafe()
-export function makeRunPromise<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
+export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
+ const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
- return <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => {
- rt ??= ManagedRuntime.make(layer, { memoMap })
- return rt.runPromise(service.use(fn), options)
+ return {
+ runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
+ runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
+ getRuntime().runPromise(service.use(fn), options),
+ runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
+ runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
}
}
diff --git a/packages/opencode/src/file/index.ts b/packages/opencode/src/file/index.ts
index 7dc36e9c3..86f7bb0dc 100644
--- a/packages/opencode/src/file/index.ts
+++ b/packages/opencode/src/file/index.ts
@@ -1,6 +1,6 @@
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { Git } from "@/git"
import { Effect, Fiber, Layer, Scope, ServiceMap } from "effect"
import { formatPatch, structuredPatch } from "diff"
@@ -688,7 +688,7 @@ export namespace File {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export function init() {
return runPromise((svc) => svc.init())
diff --git a/packages/opencode/src/file/time.ts b/packages/opencode/src/file/time.ts
index 4962ef0c9..d33848000 100644
--- a/packages/opencode/src/file/time.ts
+++ b/packages/opencode/src/file/time.ts
@@ -1,6 +1,6 @@
import { DateTime, Effect, Layer, Semaphore, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import type { SessionID } from "@/session/schema"
import { Filesystem } from "../util/filesystem"
@@ -108,7 +108,7 @@ export namespace FileTime {
}),
).pipe(Layer.orDie)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export function read(sessionID: SessionID, file: string) {
return runPromise((s) => s.read(sessionID, file))
diff --git a/packages/opencode/src/file/watcher.ts b/packages/opencode/src/file/watcher.ts
index ba7079143..42ece3582 100644
--- a/packages/opencode/src/file/watcher.ts
+++ b/packages/opencode/src/file/watcher.ts
@@ -8,7 +8,7 @@ import z from "zod"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import { Git } from "@/git"
import { Instance } from "@/project/instance"
@@ -159,7 +159,7 @@ export namespace FileWatcher {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export function init() {
return runPromise((svc) => svc.init())
diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts
index 39e0630cf..316ea5ba5 100644
--- a/packages/opencode/src/format/index.ts
+++ b/packages/opencode/src/format/index.ts
@@ -1,12 +1,10 @@
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import path from "path"
import { mergeDeep } from "remeda"
import z from "zod"
-import { Bus } from "../bus"
import { Config } from "../config/config"
-import { File } from "../file"
import { Instance } from "../project/instance"
import { Process } from "../util/process"
import { Log } from "../util/log"
@@ -29,6 +27,7 @@ export namespace Format {
export interface Interface {
readonly init: () => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
+ readonly file: (filepath: string) => Effect.Effect<void>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Format") {}
@@ -97,53 +96,46 @@ export namespace Format {
return checks.filter((x) => x.enabled).map((x) => x.item)
}
- yield* Effect.acquireRelease(
- Effect.sync(() =>
- Bus.subscribe(
- File.Event.Edited,
- Instance.bind(async (payload) => {
- const file = payload.properties.file
- log.info("formatting", { file })
- const ext = path.extname(file)
-
- for (const item of await getFormatter(ext)) {
- log.info("running", { command: item.command })
- try {
- const proc = Process.spawn(
- item.command.map((x) => x.replace("$FILE", file)),
- {
- cwd: Instance.directory,
- env: { ...process.env, ...item.environment },
- stdout: "ignore",
- stderr: "ignore",
- },
- )
- const exit = await proc.exited
- if (exit !== 0) {
- log.error("failed", {
- command: item.command,
- ...item.environment,
- })
- }
- } catch (error) {
- log.error("failed to format file", {
- error,
- command: item.command,
- ...item.environment,
- file,
- })
- }
- }
- }),
- ),
- ),
- (unsubscribe) => Effect.sync(unsubscribe),
- )
+ async function formatFile(filepath: string) {
+ log.info("formatting", { file: filepath })
+ const ext = path.extname(filepath)
+
+ for (const item of await getFormatter(ext)) {
+ log.info("running", { command: item.command })
+ try {
+ const proc = Process.spawn(
+ item.command.map((x) => x.replace("$FILE", filepath)),
+ {
+ cwd: Instance.directory,
+ env: { ...process.env, ...item.environment },
+ stdout: "ignore",
+ stderr: "ignore",
+ },
+ )
+ const exit = await proc.exited
+ if (exit !== 0) {
+ log.error("failed", {
+ command: item.command,
+ ...item.environment,
+ })
+ }
+ } catch (error) {
+ log.error("failed to format file", {
+ error,
+ command: item.command,
+ ...item.environment,
+ file: filepath,
+ })
+ }
+ }
+ }
+
log.info("init")
return {
formatters,
isEnabled,
+ formatFile,
}
}),
)
@@ -166,11 +158,16 @@ export namespace Format {
return result
})
- return Service.of({ init, status })
+ const file = Effect.fn("Format.file")(function* (filepath: string) {
+ const { formatFile } = yield* InstanceState.get(state)
+ yield* Effect.promise(() => formatFile(filepath))
+ })
+
+ return Service.of({ init, status, file })
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export async function init() {
return runPromise((s) => s.init())
@@ -179,4 +176,8 @@ export namespace Format {
export async function status() {
return runPromise((s) => s.status())
}
+
+ export async function file(filepath: string) {
+ return runPromise((s) => s.file(filepath))
+ }
}
diff --git a/packages/opencode/src/git/index.ts b/packages/opencode/src/git/index.ts
index 1442b8cb6..521643e9f 100644
--- a/packages/opencode/src/git/index.ts
+++ b/packages/opencode/src/git/index.ts
@@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { Effect, Layer, ServiceMap, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
export namespace Git {
const cfg = [
@@ -264,7 +264,7 @@ export namespace Git {
Layer.provide(NodePath.layer),
)
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export function run(args: string[], opts: Options) {
return runPromise((git) => git.run(args, opts))
diff --git a/packages/opencode/src/installation/index.ts b/packages/opencode/src/installation/index.ts
index 912951a0b..76f3d0c9e 100644
--- a/packages/opencode/src/installation/index.ts
+++ b/packages/opencode/src/installation/index.ts
@@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node"
import { Effect, Layer, Schema, ServiceMap, Stream } from "effect"
import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { withTransientReadRetry } from "@/util/effect-http-client"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import path from "path"
@@ -346,7 +346,7 @@ export namespace Installation {
Layer.provide(NodePath.layer),
)
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function info(): Promise<Info> {
return runPromise((svc) => svc.info())
diff --git a/packages/opencode/src/permission/index.ts b/packages/opencode/src/permission/index.ts
index 63e657018..1a7bd2c61 100644
--- a/packages/opencode/src/permission/index.ts
+++ b/packages/opencode/src/permission/index.ts
@@ -2,7 +2,7 @@ import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { Config } from "@/config/config"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { ProjectID } from "@/project/schema"
import { Instance } from "@/project/instance"
import { MessageID, SessionID } from "@/session/schema"
@@ -306,7 +306,7 @@ export namespace Permission {
return result
}
- export const runPromise = makeRunPromise(Service, layer)
+ export const { runPromise } = makeRuntime(Service, layer)
export async function ask(input: z.infer<typeof AskInput>) {
return runPromise((s) => s.ask(input))
diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts
index 09e991c5a..cd4b91799 100644
--- a/packages/opencode/src/plugin/index.ts
+++ b/packages/opencode/src/plugin/index.ts
@@ -11,9 +11,9 @@ import { NamedError } from "@opencode-ai/util/error"
import { CopilotAuthPlugin } from "./copilot"
import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
import { PoeAuthPlugin } from "opencode-poe-auth"
-import { Effect, Layer, ServiceMap } from "effect"
+import { Effect, Layer, ServiceMap, Stream } from "effect"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
export namespace Plugin {
const log = Log.create({ service: "plugin" })
@@ -52,6 +52,8 @@ export namespace Plugin {
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
+ const bus = yield* Bus.Service
+
const cache = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
@@ -146,17 +148,19 @@ export namespace Plugin {
}
})
- // Subscribe to bus events, clean up when scope is closed
- yield* Effect.acquireRelease(
- Effect.sync(() =>
- Bus.subscribeAll(async (input) => {
- for (const hook of hooks) {
- hook["event"]?.({ event: input })
- }
- }),
- ),
- (unsub) => Effect.sync(unsub),
- )
+ // Subscribe to bus events, fiber interrupted when scope closes
+ yield* bus
+ .subscribeAll()
+ .pipe(
+ Stream.runForEach((input) =>
+ Effect.sync(() => {
+ for (const hook of hooks) {
+ hook["event"]?.({ event: input as any })
+ }
+ }),
+ ),
+ Effect.forkScoped,
+ )
return { hooks }
}),
@@ -192,7 +196,8 @@ export namespace Plugin {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function trigger<
Name extends TriggerName,
diff --git a/packages/opencode/src/project/project.ts b/packages/opencode/src/project/project.ts
index 256be3695..b8de639e7 100644
--- a/packages/opencode/src/project/project.ts
+++ b/packages/opencode/src/project/project.ts
@@ -11,7 +11,7 @@ import { ProjectID } from "./schema"
import { Effect, Layer, Path, Scope, ServiceMap, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { NodeFileSystem, NodePath } from "@effect/platform-node"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
@@ -462,7 +462,7 @@ export namespace Project {
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
)
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
// ---------------------------------------------------------------------------
// Promise-based API (delegates to Effect service via runPromise)
diff --git a/packages/opencode/src/project/vcs.ts b/packages/opencode/src/project/vcs.ts
index e3243ba8e..7df9dfb6f 100644
--- a/packages/opencode/src/project/vcs.ts
+++ b/packages/opencode/src/project/vcs.ts
@@ -1,9 +1,9 @@
-import { Effect, Layer, ServiceMap } from "effect"
+import { Effect, Layer, ServiceMap, Stream } from "effect"
import path from "path"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { FileWatcher } from "@/file/watcher"
import { Git } from "@/git"
@@ -139,11 +139,12 @@ export namespace Vcs {
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Vcs") {}
- export const layer: Layer.Layer<Service, never, AppFileSystem.Service | Git.Service> = Layer.effect(
+ export const layer: Layer.Layer<Service, never, AppFileSystem.Service | Git.Service | Bus.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const git = yield* Git.Service
+ const bus = yield* Bus.Service
const state = yield* InstanceState.make<State>(
Effect.fn("Vcs.state")((ctx) =>
Effect.gen(function* () {
@@ -158,22 +159,22 @@ export namespace Vcs {
const value = { current, root }
log.info("initialized", { branch: value.current, default_branch: value.root?.name })
- yield* Effect.acquireRelease(
- Effect.sync(() =>
- Bus.subscribe(
- FileWatcher.Event.Updated,
- Instance.bind(async (evt) => {
- if (!evt.properties.file.endsWith("HEAD")) return
- const next = await get()
- if (next === value.current) return
- log.info("branch changed", { from: value.current, to: next })
- value.current = next
- Bus.publish(Event.BranchUpdated, { branch: next })
+ yield* bus
+ .subscribe(FileWatcher.Event.Updated)
+ .pipe(
+ Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
+ Stream.runForEach((_evt) =>
+ Effect.gen(function* () {
+ const next = yield* Effect.promise(() => get())
+ if (next !== value.current) {
+ log.info("branch changed", { from: value.current, to: next })
+ value.current = next
+ yield* bus.publish(Event.BranchUpdated, { branch: next })
+ }
}),
),
- ),
- (unsubscribe) => Effect.sync(unsubscribe),
- )
+ Effect.forkScoped,
+ )
return value
}),
@@ -212,9 +213,13 @@ export namespace Vcs {
}),
)
- export const defaultLayer = layer.pipe(Layer.provide(Git.defaultLayer), Layer.provide(AppFileSystem.defaultLayer))
+ export const defaultLayer = layer.pipe(
+ Layer.provide(Git.defaultLayer),
+ Layer.provide(AppFileSystem.defaultLayer),
+ Layer.provide(Bus.layer),
+ )
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export function init() {
return runPromise((svc) => svc.init())
diff --git a/packages/opencode/src/provider/auth.ts b/packages/opencode/src/provider/auth.ts
index 99184c48a..759f8803a 100644
--- a/packages/opencode/src/provider/auth.ts
+++ b/packages/opencode/src/provider/auth.ts
@@ -2,7 +2,7 @@ import type { AuthOuathResult, Hooks } from "@opencode-ai/plugin"
import { NamedError } from "@opencode-ai/util/error"
import { Auth } from "@/auth"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { Plugin } from "../plugin"
import { ProviderID } from "./schema"
import { Array as Arr, Effect, Layer, Record, Result, ServiceMap } from "effect"
@@ -231,7 +231,7 @@ export namespace ProviderAuth {
export const defaultLayer = layer.pipe(Layer.provide(Auth.layer))
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function methods() {
return runPromise((svc) => svc.methods())
diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts
index f866b18ad..1ba87126b 100644
--- a/packages/opencode/src/pty/index.ts
+++ b/packages/opencode/src/pty/index.ts
@@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { Instance } from "@/project/instance"
import { type IPty } from "bun-pty"
import z from "zod"
@@ -361,7 +361,7 @@ export namespace Pty {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export async function list() {
return runPromise((svc) => svc.list())
diff --git a/packages/opencode/src/question/index.ts b/packages/opencode/src/question/index.ts
index a0d62d94b..f46cdd108 100644
--- a/packages/opencode/src/question/index.ts
+++ b/packages/opencode/src/question/index.ts
@@ -2,7 +2,7 @@ import { Deferred, Effect, Layer, Schema, ServiceMap } from "effect"
import { Bus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { SessionID, MessageID } from "@/session/schema"
import { Log } from "@/util/log"
import z from "zod"
@@ -197,7 +197,7 @@ export namespace Question {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export async function ask(input: {
sessionID: SessionID
diff --git a/packages/opencode/src/session/status.ts b/packages/opencode/src/session/status.ts
index 462d5ded4..34a79eed1 100644
--- a/packages/opencode/src/session/status.ts
+++ b/packages/opencode/src/session/status.ts
@@ -1,7 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { SessionID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
import z from "zod"
@@ -55,6 +55,8 @@ export namespace SessionStatus {
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
+ const bus = yield* Bus.Service
+
const state = yield* InstanceState.make(
Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map<SessionID, Info>())),
)
@@ -70,9 +72,9 @@ export namespace SessionStatus {
const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
const data = yield* InstanceState.get(state)
- yield* Effect.promise(() => Bus.publish(Event.Status, { sessionID, status }))
+ yield* bus.publish(Event.Status, { sessionID, status })
if (status.type === "idle") {
- yield* Effect.promise(() => Bus.publish(Event.Idle, { sessionID }))
+ yield* bus.publish(Event.Idle, { sessionID })
data.delete(sessionID)
return
}
@@ -83,7 +85,8 @@ export namespace SessionStatus {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(sessionID: SessionID) {
return runPromise((svc) => svc.get(sessionID))
diff --git a/packages/opencode/src/skill/index.ts b/packages/opencode/src/skill/index.ts
index 43a22219e..239549a1a 100644
--- a/packages/opencode/src/skill/index.ts
+++ b/packages/opencode/src/skill/index.ts
@@ -7,7 +7,7 @@ import { NamedError } from "@opencode-ai/util/error"
import type { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { Flag } from "@/flag/flag"
import { Global } from "@/global"
import { Permission } from "@/permission"
@@ -242,7 +242,7 @@ export namespace Skill {
return ["## Available Skills", ...list.map((skill) => `- **${skill.name}**: ${skill.description}`)].join("\n")
}
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function get(name: string) {
return runPromise((skill) => skill.get(name))
diff --git a/packages/opencode/src/snapshot/index.ts b/packages/opencode/src/snapshot/index.ts
index 98a9d322c..d6bdf8a3c 100644
--- a/packages/opencode/src/snapshot/index.ts
+++ b/packages/opencode/src/snapshot/index.ts
@@ -5,7 +5,7 @@ import path from "path"
import z from "zod"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { Hash } from "@/util/hash"
import { Config } from "../config/config"
@@ -459,7 +459,7 @@ export namespace Snapshot {
Layer.provide(NodePath.layer),
)
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function init() {
return runPromise((svc) => svc.init())
diff --git a/packages/opencode/src/tool/apply_patch.ts b/packages/opencode/src/tool/apply_patch.ts
index 06293b6eb..c23c0dd3d 100644
--- a/packages/opencode/src/tool/apply_patch.ts
+++ b/packages/opencode/src/tool/apply_patch.ts
@@ -13,6 +13,7 @@ import { LSP } from "../lsp"
import { Filesystem } from "../util/filesystem"
import DESCRIPTION from "./apply_patch.txt"
import { File } from "../file"
+import { Format } from "../format"
const PatchParams = z.object({
patchText: z.string().describe("The full patch text that describes all changes to be made"),
@@ -220,9 +221,8 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
}
if (edited) {
- await Bus.publish(File.Event.Edited, {
- file: edited,
- })
+ await Format.file(edited)
+ Bus.publish(File.Event.Edited, { file: edited })
}
}
diff --git a/packages/opencode/src/tool/edit.ts b/packages/opencode/src/tool/edit.ts
index 1a7614fc1..554d547d0 100644
--- a/packages/opencode/src/tool/edit.ts
+++ b/packages/opencode/src/tool/edit.ts
@@ -12,6 +12,7 @@ import DESCRIPTION from "./edit.txt"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
import { Bus } from "../bus"
+import { Format } from "../format"
import { FileTime } from "../file/time"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
@@ -71,9 +72,8 @@ export const EditTool = Tool.define("edit", {
},
})
await Filesystem.write(filePath, params.newString)
- await Bus.publish(File.Event.Edited, {
- file: filePath,
- })
+ await Format.file(filePath)
+ Bus.publish(File.Event.Edited, { file: filePath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filePath,
event: existed ? "change" : "add",
@@ -108,9 +108,8 @@ export const EditTool = Tool.define("edit", {
})
await Filesystem.write(filePath, contentNew)
- await Bus.publish(File.Event.Edited, {
- file: filePath,
- })
+ await Format.file(filePath)
+ Bus.publish(File.Event.Edited, { file: filePath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filePath,
event: "change",
diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts
index 6381fcfbc..ada761fd5 100644
--- a/packages/opencode/src/tool/registry.ts
+++ b/packages/opencode/src/tool/registry.ts
@@ -31,7 +31,7 @@ import { Glob } from "../util/glob"
import { pathToFileURL } from "url"
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
export namespace ToolRegistry {
const log = Log.create({ service: "tool.registry" })
@@ -198,7 +198,7 @@ export namespace ToolRegistry {
}),
)
- const runPromise = makeRunPromise(Service, layer)
+ const { runPromise } = makeRuntime(Service, layer)
export async function register(tool: Tool.Info) {
return runPromise((svc) => svc.register(tool))
diff --git a/packages/opencode/src/tool/truncate.ts b/packages/opencode/src/tool/truncate.ts
index fa1d0a4ae..5cddacefc 100644
--- a/packages/opencode/src/tool/truncate.ts
+++ b/packages/opencode/src/tool/truncate.ts
@@ -2,7 +2,7 @@ import { NodePath } from "@effect/platform-node"
import { Cause, Duration, Effect, Layer, Schedule, ServiceMap } from "effect"
import path from "path"
import type { Agent } from "../agent/agent"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import { AppFileSystem } from "@/filesystem"
import { evaluate } from "@/permission/evaluate"
import { Identifier } from "../id/id"
@@ -136,7 +136,7 @@ export namespace Truncate {
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodePath.layer))
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function output(text: string, options: Options = {}, agent?: Agent.Info): Promise<Result> {
return runPromise((s) => s.output(text, options, agent))
diff --git a/packages/opencode/src/tool/write.ts b/packages/opencode/src/tool/write.ts
index 83474a543..6b134e525 100644
--- a/packages/opencode/src/tool/write.ts
+++ b/packages/opencode/src/tool/write.ts
@@ -7,6 +7,7 @@ import DESCRIPTION from "./write.txt"
import { Bus } from "../bus"
import { File } from "../file"
import { FileWatcher } from "../file/watcher"
+import { Format } from "../format"
import { FileTime } from "../file/time"
import { Filesystem } from "../util/filesystem"
import { Instance } from "../project/instance"
@@ -42,9 +43,8 @@ export const WriteTool = Tool.define("write", {
})
await Filesystem.write(filepath, params.content)
- await Bus.publish(File.Event.Edited, {
- file: filepath,
- })
+ await Format.file(filepath)
+ Bus.publish(File.Event.Edited, { file: filepath })
await Bus.publish(FileWatcher.Event.Updated, {
file: filepath,
event: exists ? "change" : "add",
diff --git a/packages/opencode/src/worktree/index.ts b/packages/opencode/src/worktree/index.ts
index 0a8f0f00a..41f11ca01 100644
--- a/packages/opencode/src/worktree/index.ts
+++ b/packages/opencode/src/worktree/index.ts
@@ -15,7 +15,7 @@ import { Git } from "@/git"
import { Effect, FileSystem, Layer, Path, Scope, ServiceMap, Stream } from "effect"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import { NodeFileSystem, NodePath } from "@effect/platform-node"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
export namespace Worktree {
@@ -576,7 +576,7 @@ export namespace Worktree {
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
)
- const runPromise = makeRunPromise(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function makeWorktreeInfo(name?: string) {
return runPromise((svc) => svc.makeWorktreeInfo(name))
diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts
new file mode 100644
index 000000000..642763e90
--- /dev/null
+++ b/packages/opencode/test/bus/bus-effect.test.ts
@@ -0,0 +1,164 @@
+import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
+import { describe, expect } from "bun:test"
+import { Deferred, Effect, Layer, Stream } from "effect"
+import z from "zod"
+import { Bus } from "../../src/bus"
+import { BusEvent } from "../../src/bus/bus-event"
+import { Instance } from "../../src/project/instance"
+import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
+
+const TestEvent = {
+ Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })),
+ Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })),
+}
+
+const node = NodeChildProcessSpawner.layer.pipe(
+ Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
+)
+
+const live = Layer.mergeAll(Bus.layer, node)
+
+const it = testEffect(live)
+
+describe("Bus (Effect-native)", () => {
+ it.effect("publish + subscribe stream delivers events", () =>
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const bus = yield* Bus.Service
+ const received: number[] = []
+ const done = yield* Deferred.make<void>()
+
+ yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+ Effect.sync(() => {
+ received.push(evt.properties.value)
+ if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
+ }),
+ ).pipe(Effect.forkScoped)
+
+ yield* Effect.sleep("10 millis")
+ yield* bus.publish(TestEvent.Ping, { value: 1 })
+ yield* bus.publish(TestEvent.Ping, { value: 2 })
+ yield* Deferred.await(done)
+
+ expect(received).toEqual([1, 2])
+ }),
+ ),
+ )
+
+ it.effect("subscribe filters by event type", () =>
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const bus = yield* Bus.Service
+ const pings: number[] = []
+ const done = yield* Deferred.make<void>()
+
+ yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+ Effect.sync(() => {
+ pings.push(evt.properties.value)
+ Deferred.doneUnsafe(done, Effect.void)
+ }),
+ ).pipe(Effect.forkScoped)
+
+ yield* Effect.sleep("10 millis")
+ yield* bus.publish(TestEvent.Pong, { message: "ignored" })
+ yield* bus.publish(TestEvent.Ping, { value: 42 })
+ yield* Deferred.await(done)
+
+ expect(pings).toEqual([42])
+ }),
+ ),
+ )
+
+ it.effect("subscribeAll receives all types", () =>
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const bus = yield* Bus.Service
+ const types: string[] = []
+ const done = yield* Deferred.make<void>()
+
+ yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
+ Effect.sync(() => {
+ types.push(evt.type)
+ if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
+ }),
+ ).pipe(Effect.forkScoped)
+
+ yield* Effect.sleep("10 millis")
+ yield* bus.publish(TestEvent.Ping, { value: 1 })
+ yield* bus.publish(TestEvent.Pong, { message: "hi" })
+ yield* Deferred.await(done)
+
+ expect(types).toContain("test.effect.ping")
+ expect(types).toContain("test.effect.pong")
+ }),
+ ),
+ )
+
+ it.effect("multiple subscribers each receive the event", () =>
+ provideTmpdirInstance(() =>
+ Effect.gen(function* () {
+ const bus = yield* Bus.Service
+ const a: number[] = []
+ const b: number[] = []
+ const doneA = yield* Deferred.make<void>()
+ const doneB = yield* Deferred.make<void>()
+
+ yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+ Effect.sync(() => {
+ a.push(evt.properties.value)
+ Deferred.doneUnsafe(doneA, Effect.void)
+ }),
+ ).pipe(Effect.forkScoped)
+
+ yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+ Effect.sync(() => {
+ b.push(evt.properties.value)
+ Deferred.doneUnsafe(doneB, Effect.void)
+ }),
+ ).pipe(Effect.forkScoped)
+
+ yield* Effect.sleep("10 millis")
+ yield* bus.publish(TestEvent.Ping, { value: 99 })
+ yield* Deferred.await(doneA)
+ yield* Deferred.await(doneB)
+
+ expect(a).toEqual([99])
+ expect(b).toEqual([99])
+ }),
+ ),
+ )
+
+ it.effect("subscribeAll stream sees InstanceDisposed on disposal", () =>
+ Effect.gen(function* () {
+ const dir = yield* tmpdirScoped()
+ const types: string[] = []
+ const seen = yield* Deferred.make<void>()
+ const disposed = yield* Deferred.make<void>()
+
+ // Set up subscriber inside the instance
+ yield* Effect.gen(function* () {
+ const bus = yield* Bus.Service
+
+ yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
+ Effect.sync(() => {
+ types.push(evt.type)
+ if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
+ if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
+ }),
+ ).pipe(Effect.forkScoped)
+
+ yield* Effect.sleep("10 millis")
+ yield* bus.publish(TestEvent.Ping, { value: 1 })
+ yield* Deferred.await(seen)
+ }).pipe(provideInstance(dir))
+
+ // Dispose from OUTSIDE the instance scope
+ yield* Effect.promise(() => Instance.disposeAll())
+ yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
+
+ expect(types).toContain("test.effect.ping")
+ expect(types).toContain(Bus.InstanceDisposed.type)
+ }),
+ )
+})
diff --git a/packages/opencode/test/bus/bus-integration.test.ts b/packages/opencode/test/bus/bus-integration.test.ts
new file mode 100644
index 000000000..e42bd5299
--- /dev/null
+++ b/packages/opencode/test/bus/bus-integration.test.ts
@@ -0,0 +1,87 @@
+import { afterEach, describe, expect, test } from "bun:test"
+import z from "zod"
+import { Bus } from "../../src/bus"
+import { BusEvent } from "../../src/bus/bus-event"
+import { Instance } from "../../src/project/instance"
+import { tmpdir } from "../fixture/fixture"
+
+const TestEvent = BusEvent.define("test.integration", z.object({ value: z.number() }))
+
+function withInstance(directory: string, fn: () => Promise<void>) {
+ return Instance.provide({ directory, fn })
+}
+
+describe("Bus integration: acquireRelease subscriber pattern", () => {
+ afterEach(() => Instance.disposeAll())
+
+ test("subscriber via callback facade receives events and cleans up on unsub", async () => {
+ await using tmp = await tmpdir()
+ const received: number[] = []
+
+ await withInstance(tmp.path, async () => {
+ const unsub = Bus.subscribe(TestEvent, (evt) => {
+ received.push(evt.properties.value)
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent, { value: 1 })
+ await Bus.publish(TestEvent, { value: 2 })
+ await Bun.sleep(10)
+
+ expect(received).toEqual([1, 2])
+
+ unsub()
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent, { value: 3 })
+ await Bun.sleep(10)
+
+ expect(received).toEqual([1, 2])
+ })
+ })
+
+ test("subscribeAll receives events from multiple types", async () => {
+ await using tmp = await tmpdir()
+ const received: Array<{ type: string; value?: number }> = []
+
+ const OtherEvent = BusEvent.define("test.other", z.object({ value: z.number() }))
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribeAll((evt) => {
+ received.push({ type: evt.type, value: evt.properties.value })
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent, { value: 10 })
+ await Bus.publish(OtherEvent, { value: 20 })
+ await Bun.sleep(10)
+ })
+
+ expect(received).toEqual([
+ { type: "test.integration", value: 10 },
+ { type: "test.other", value: 20 },
+ ])
+ })
+
+ test("subscriber cleanup on instance disposal interrupts the stream", async () => {
+ await using tmp = await tmpdir()
+ const received: number[] = []
+ let disposed = false
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribeAll((evt) => {
+ if (evt.type === Bus.InstanceDisposed.type) {
+ disposed = true
+ return
+ }
+ received.push(evt.properties.value)
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent, { value: 1 })
+ await Bun.sleep(10)
+ })
+
+ await Instance.disposeAll()
+ await Bun.sleep(50)
+
+ expect(received).toEqual([1])
+ expect(disposed).toBe(true)
+ })
+})
diff --git a/packages/opencode/test/bus/bus.test.ts b/packages/opencode/test/bus/bus.test.ts
new file mode 100644
index 000000000..3df179787
--- /dev/null
+++ b/packages/opencode/test/bus/bus.test.ts
@@ -0,0 +1,219 @@
+import { afterEach, describe, expect, test } from "bun:test"
+import z from "zod"
+import { Bus } from "../../src/bus"
+import { BusEvent } from "../../src/bus/bus-event"
+import { Instance } from "../../src/project/instance"
+import { tmpdir } from "../fixture/fixture"
+
+const TestEvent = {
+ Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
+ Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
+}
+
+function withInstance(directory: string, fn: () => Promise<void>) {
+ return Instance.provide({ directory, fn })
+}
+
+describe("Bus", () => {
+ afterEach(() => Instance.disposeAll())
+
+ describe("publish + subscribe", () => {
+ test("subscriber is live immediately after subscribe returns", async () => {
+ await using tmp = await tmpdir()
+ const received: number[] = []
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribe(TestEvent.Ping, (evt) => {
+ received.push(evt.properties.value)
+ })
+ await Bus.publish(TestEvent.Ping, { value: 42 })
+ await Bun.sleep(10)
+ })
+
+ expect(received).toEqual([42])
+ })
+
+ test("subscriber receives matching events", async () => {
+ await using tmp = await tmpdir()
+ const received: number[] = []
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribe(TestEvent.Ping, (evt) => {
+ received.push(evt.properties.value)
+ })
+ // Give the subscriber fiber time to start consuming
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent.Ping, { value: 42 })
+ await Bus.publish(TestEvent.Ping, { value: 99 })
+ // Give subscriber time to process
+ await Bun.sleep(10)
+ })
+
+ expect(received).toEqual([42, 99])
+ })
+
+ test("subscriber does not receive events of other types", async () => {
+ await using tmp = await tmpdir()
+ const pings: number[] = []
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribe(TestEvent.Ping, (evt) => {
+ pings.push(evt.properties.value)
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent.Pong, { message: "hello" })
+ await Bus.publish(TestEvent.Ping, { value: 1 })
+ await Bun.sleep(10)
+ })
+
+ expect(pings).toEqual([1])
+ })
+
+ test("publish with no subscribers does not throw", async () => {
+ await using tmp = await tmpdir()
+
+ await withInstance(tmp.path, async () => {
+ await Bus.publish(TestEvent.Ping, { value: 1 })
+ })
+ })
+ })
+
+ describe("unsubscribe", () => {
+ test("unsubscribe stops delivery", async () => {
+ await using tmp = await tmpdir()
+ const received: number[] = []
+
+ await withInstance(tmp.path, async () => {
+ const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
+ received.push(evt.properties.value)
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent.Ping, { value: 1 })
+ await Bun.sleep(10)
+ unsub()
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent.Ping, { value: 2 })
+ await Bun.sleep(10)
+ })
+
+ expect(received).toEqual([1])
+ })
+ })
+
+ describe("subscribeAll", () => {
+ test("subscribeAll is live immediately after subscribe returns", async () => {
+ await using tmp = await tmpdir()
+ const received: string[] = []
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribeAll((evt) => {
+ received.push(evt.type)
+ })
+ await Bus.publish(TestEvent.Ping, { value: 1 })
+ await Bun.sleep(10)
+ })
+
+ expect(received).toEqual(["test.ping"])
+ })
+
+ test("receives all event types", async () => {
+ await using tmp = await tmpdir()
+ const received: string[] = []
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribeAll((evt) => {
+ received.push(evt.type)
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent.Ping, { value: 1 })
+ await Bus.publish(TestEvent.Pong, { message: "hi" })
+ await Bun.sleep(10)
+ })
+
+ expect(received).toContain("test.ping")
+ expect(received).toContain("test.pong")
+ })
+ })
+
+ describe("multiple subscribers", () => {
+ test("all subscribers for same event type are called", async () => {
+ await using tmp = await tmpdir()
+ const a: number[] = []
+ const b: number[] = []
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribe(TestEvent.Ping, (evt) => {
+ a.push(evt.properties.value)
+ })
+ Bus.subscribe(TestEvent.Ping, (evt) => {
+ b.push(evt.properties.value)
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent.Ping, { value: 7 })
+ await Bun.sleep(10)
+ })
+
+ expect(a).toEqual([7])
+ expect(b).toEqual([7])
+ })
+ })
+
+ describe("instance isolation", () => {
+ test("events in one directory do not reach subscribers in another", async () => {
+ await using tmpA = await tmpdir()
+ await using tmpB = await tmpdir()
+ const receivedA: number[] = []
+ const receivedB: number[] = []
+
+ await withInstance(tmpA.path, async () => {
+ Bus.subscribe(TestEvent.Ping, (evt) => {
+ receivedA.push(evt.properties.value)
+ })
+ await Bun.sleep(10)
+ })
+
+ await withInstance(tmpB.path, async () => {
+ Bus.subscribe(TestEvent.Ping, (evt) => {
+ receivedB.push(evt.properties.value)
+ })
+ await Bun.sleep(10)
+ })
+
+ await withInstance(tmpA.path, async () => {
+ await Bus.publish(TestEvent.Ping, { value: 1 })
+ await Bun.sleep(10)
+ })
+
+ await withInstance(tmpB.path, async () => {
+ await Bus.publish(TestEvent.Ping, { value: 2 })
+ await Bun.sleep(10)
+ })
+
+ expect(receivedA).toEqual([1])
+ expect(receivedB).toEqual([2])
+ })
+ })
+
+ describe("instance disposal", () => {
+ test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => {
+ await using tmp = await tmpdir()
+ const received: string[] = []
+
+ await withInstance(tmp.path, async () => {
+ Bus.subscribeAll((evt) => {
+ received.push(evt.type)
+ })
+ await Bun.sleep(10)
+ await Bus.publish(TestEvent.Ping, { value: 1 })
+ await Bun.sleep(10)
+ })
+
+ // Instance.disposeAll triggers the finalizer which publishes InstanceDisposed
+ await Instance.disposeAll()
+ await Bun.sleep(50)
+
+ expect(received).toContain("test.ping")
+ expect(received).toContain(Bus.InstanceDisposed.type)
+ })
+ })
+})
diff --git a/packages/opencode/test/effect/run-service.test.ts b/packages/opencode/test/effect/run-service.test.ts
index c9f630585..b2004fb66 100644
--- a/packages/opencode/test/effect/run-service.test.ts
+++ b/packages/opencode/test/effect/run-service.test.ts
@@ -1,10 +1,10 @@
import { expect, test } from "bun:test"
import { Effect, Layer, ServiceMap } from "effect"
-import { makeRunPromise } from "../../src/effect/run-service"
+import { makeRuntime } from "../../src/effect/run-service"
class Shared extends ServiceMap.Service<Shared, { readonly id: number }>()("@test/Shared") {}
-test("makeRunPromise shares dependent layers through the shared memo map", async () => {
+test("makeRuntime shares dependent layers through the shared memo map", async () => {
let n = 0
const shared = Layer.effect(
@@ -37,8 +37,8 @@ test("makeRunPromise shares dependent layers through the shared memo map", async
}),
).pipe(Layer.provide(shared))
- const runOne = makeRunPromise(One, one)
- const runTwo = makeRunPromise(Two, two)
+ const { runPromise: runOne } = makeRuntime(One, one)
+ const { runPromise: runTwo } = makeRuntime(Two, two)
expect(await runOne((svc) => svc.get())).toBe(1)
expect(await runTwo((svc) => svc.get())).toBe(1)
diff --git a/packages/opencode/test/file/watcher.test.ts b/packages/opencode/test/file/watcher.test.ts
index 6658634e5..f98a580f6 100644
--- a/packages/opencode/test/file/watcher.test.ts
+++ b/packages/opencode/test/file/watcher.test.ts
@@ -2,9 +2,8 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
-import { Deferred, Effect, Option } from "effect"
+import { ConfigProvider, Deferred, Effect, Layer, ManagedRuntime, Option } from "effect"
import { tmpdir } from "../fixture/fixture"
-import { watcherConfigLayer, withServices } from "../fixture/instance"
import { Bus } from "../../src/bus"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
@@ -16,20 +15,33 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
// Helpers
// ---------------------------------------------------------------------------
+const watcherConfigLayer = ConfigProvider.layer(
+ ConfigProvider.fromUnknown({
+ OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
+ OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
+ }),
+)
+
type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
/** Run `body` with a live FileWatcher service. */
function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
- return withServices(
+ return Instance.provide({
directory,
- FileWatcher.layer,
- async (rt) => {
- await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
- await Effect.runPromise(ready(directory))
- await Effect.runPromise(body)
+ fn: async () => {
+ const layer: Layer.Layer<FileWatcher.Service, never, never> = FileWatcher.layer.pipe(
+ Layer.provide(watcherConfigLayer),
+ )
+ const rt = ManagedRuntime.make(layer)
+ try {
+ await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
+ await Effect.runPromise(ready(directory))
+ await Effect.runPromise(body)
+ } finally {
+ await rt.dispose()
+ }
},
- { provide: [watcherConfigLayer] },
- )
+ })
}
function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
diff --git a/packages/opencode/test/fixture/fixture.ts b/packages/opencode/test/fixture/fixture.ts
index f2f864e8b..a36a3f9d8 100644
--- a/packages/opencode/test/fixture/fixture.ts
+++ b/packages/opencode/test/fixture/fixture.ts
@@ -2,7 +2,10 @@ import { $ } from "bun"
import * as fs from "fs/promises"
import os from "os"
import path from "path"
+import { Effect, FileSystem, ServiceMap } from "effect"
+import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import type { Config } from "../../src/config/config"
+import { Instance } from "../../src/project/instance"
// Strip null bytes from paths (defensive fix for CI environment issues)
function sanitizePath(p: string): string {
@@ -71,3 +74,68 @@ export async function tmpdir<T>(options?: TmpDirOptions<T>) {
}
return result
}
+
+/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */
+export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.Info> }) {
+ return Effect.gen(function* () {
+ const fs = yield* FileSystem.FileSystem
+ const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
+ const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" })
+
+ const git = (...args: string[]) =>
+ spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode))
+
+ if (options?.git) {
+ yield* git("init")
+ yield* git("config", "core.fsmonitor", "false")
+ yield* git("config", "user.email", "[email protected]")
+ yield* git("config", "user.name", "Test")
+ yield* git("commit", "--allow-empty", "-m", "root commit")
+ }
+
+ if (options?.config) {
+ yield* fs.writeFileString(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
+ )
+ }
+
+ return dir
+ })
+}
+
+export const provideInstance =
+ (directory: string) =>
+ <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> =>
+ Effect.servicesWith((services: ServiceMap.ServiceMap<R>) =>
+ Effect.promise<A>(async () =>
+ Instance.provide({
+ directory,
+ fn: () => Effect.runPromiseWith(services)(self),
+ }),
+ ),
+ )
+
+export function provideTmpdirInstance<A, E, R>(
+ self: (path: string) => Effect.Effect<A, E, R>,
+ options?: { git?: boolean; config?: Partial<Config.Info> },
+) {
+ return Effect.gen(function* () {
+ const path = yield* tmpdirScoped(options)
+ let provided = false
+
+ yield* Effect.addFinalizer(() =>
+ provided
+ ? Effect.promise(() =>
+ Instance.provide({
+ directory: path,
+ fn: () => Instance.dispose(),
+ }),
+ ).pipe(Effect.ignore)
+ : Effect.void,
+ )
+
+ provided = true
+ return yield* self(path).pipe(provideInstance(path))
+ })
+}
diff --git a/packages/opencode/test/fixture/instance.ts b/packages/opencode/test/fixture/instance.ts
deleted file mode 100644
index 67af82fc8..000000000
--- a/packages/opencode/test/fixture/instance.ts
+++ /dev/null
@@ -1,51 +0,0 @@
-import { ConfigProvider, Layer, ManagedRuntime } from "effect"
-import { InstanceContext } from "../../src/effect/instance-context"
-import { Instance } from "../../src/project/instance"
-
-/** ConfigProvider that enables the experimental file watcher. */
-export const watcherConfigLayer = ConfigProvider.layer(
- ConfigProvider.fromUnknown({
- OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
- OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
- }),
-)
-
-/**
- * Boot an Instance with the given service layers and run `body` with
- * the ManagedRuntime. Cleanup is automatic — the runtime is disposed
- * and Instance context is torn down when `body` completes.
- *
- * Layers may depend on InstanceContext (provided automatically).
- * Pass extra layers via `options.provide` (e.g. ConfigProvider.layer).
- */
-export function withServices<S>(
- directory: string,
- layer: Layer.Layer<S, any, InstanceContext>,
- body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>,
- options?: { provide?: Layer.Layer<never>[] },
-) {
- return Instance.provide({
- directory,
- fn: async () => {
- const ctx = Layer.sync(InstanceContext, () =>
- InstanceContext.of({
- directory: Instance.directory,
- worktree: Instance.worktree,
- project: Instance.project,
- }),
- )
- let resolved: Layer.Layer<S> = layer.pipe(Layer.provide(ctx)) as any
- if (options?.provide) {
- for (const l of options.provide) {
- resolved = resolved.pipe(Layer.provide(l)) as any
- }
- }
- const rt = ManagedRuntime.make(resolved)
- try {
- await body(rt)
- } finally {
- await rt.dispose()
- }
- },
- })
-}
diff --git a/packages/opencode/test/format/format.test.ts b/packages/opencode/test/format/format.test.ts
index 68fe71e03..c718c13e8 100644
--- a/packages/opencode/test/format/format.test.ts
+++ b/packages/opencode/test/format/format.test.ts
@@ -1,172 +1,182 @@
-import { Effect } from "effect"
-import { afterEach, describe, expect, test } from "bun:test"
-import { tmpdir } from "../fixture/fixture"
-import { withServices } from "../fixture/instance"
-import { Bus } from "../../src/bus"
-import { File } from "../../src/file"
+import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
+import { describe, expect } from "bun:test"
+import { Effect, Layer } from "effect"
+import { provideTmpdirInstance } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
import { Format } from "../../src/format"
import * as Formatter from "../../src/format/formatter"
-import { Instance } from "../../src/project/instance"
+
+const node = NodeChildProcessSpawner.layer.pipe(
+ Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
+)
+
+const it = testEffect(Layer.mergeAll(Format.layer, node))
describe("Format", () => {
- afterEach(async () => {
- await Instance.disposeAll()
- })
-
- test("status() returns built-in formatters when no config overrides", async () => {
- await using tmp = await tmpdir()
-
- await withServices(tmp.path, Format.layer, async (rt) => {
- const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
- expect(Array.isArray(statuses)).toBe(true)
- expect(statuses.length).toBeGreaterThan(0)
-
- for (const s of statuses) {
- expect(typeof s.name).toBe("string")
- expect(Array.isArray(s.extensions)).toBe(true)
- expect(typeof s.enabled).toBe("boolean")
- }
-
- const gofmt = statuses.find((s) => s.name === "gofmt")
- expect(gofmt).toBeDefined()
- expect(gofmt!.extensions).toContain(".go")
- })
- })
-
- test("status() returns empty list when formatter is disabled", async () => {
- await using tmp = await tmpdir({
- config: { formatter: false },
- })
-
- await withServices(tmp.path, Format.layer, async (rt) => {
- const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
- expect(statuses).toEqual([])
- })
- })
-
- test("status() excludes formatters marked as disabled in config", async () => {
- await using tmp = await tmpdir({
- config: {
- formatter: {
- gofmt: { disabled: true },
+ it.effect("status() returns built-in formatters when no config overrides", () =>
+ provideTmpdirInstance(() =>
+ Format.Service.use((fmt) =>
+ Effect.gen(function* () {
+ const statuses = yield* fmt.status()
+ expect(Array.isArray(statuses)).toBe(true)
+ expect(statuses.length).toBeGreaterThan(0)
+
+ for (const item of statuses) {
+ expect(typeof item.name).toBe("string")
+ expect(Array.isArray(item.extensions)).toBe(true)
+ expect(typeof item.enabled).toBe("boolean")
+ }
+
+ const gofmt = statuses.find((item) => item.name === "gofmt")
+ expect(gofmt).toBeDefined()
+ expect(gofmt!.extensions).toContain(".go")
+ }),
+ ),
+ ),
+ )
+
+ it.effect("status() returns empty list when formatter is disabled", () =>
+ provideTmpdirInstance(
+ () =>
+ Format.Service.use((fmt) =>
+ Effect.gen(function* () {
+ expect(yield* fmt.status()).toEqual([])
+ }),
+ ),
+ { config: { formatter: false } },
+ ),
+ )
+
+ it.effect("status() excludes formatters marked as disabled in config", () =>
+ provideTmpdirInstance(
+ () =>
+ Format.Service.use((fmt) =>
+ Effect.gen(function* () {
+ const statuses = yield* fmt.status()
+ const gofmt = statuses.find((item) => item.name === "gofmt")
+ expect(gofmt).toBeUndefined()
+ }),
+ ),
+ {
+ config: {
+ formatter: {
+ gofmt: { disabled: true },
+ },
},
},
- })
-
- await withServices(tmp.path, Format.layer, async (rt) => {
- const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
- const gofmt = statuses.find((s) => s.name === "gofmt")
- expect(gofmt).toBeUndefined()
- })
- })
-
- test("service initializes without error", async () => {
- await using tmp = await tmpdir()
-
- await withServices(tmp.path, Format.layer, async (rt) => {
- await rt.runPromise(Format.Service.use(() => Effect.void))
- })
- })
-
- test("status() initializes formatter state per directory", async () => {
- await using off = await tmpdir({
- config: { formatter: false },
- })
- await using on = await tmpdir()
-
- const a = await Instance.provide({
- directory: off.path,
- fn: () => Format.status(),
- })
- const b = await Instance.provide({
- directory: on.path,
- fn: () => Format.status(),
- })
-
- expect(a).toEqual([])
- expect(b.length).toBeGreaterThan(0)
- })
-
- test("runs enabled checks for matching formatters in parallel", async () => {
- await using tmp = await tmpdir()
-
- const file = `${tmp.path}/test.parallel`
- await Bun.write(file, "x")
-
- const one = {
- extensions: Formatter.gofmt.extensions,
- enabled: Formatter.gofmt.enabled,
- command: Formatter.gofmt.command,
- }
- const two = {
- extensions: Formatter.mix.extensions,
- enabled: Formatter.mix.enabled,
- command: Formatter.mix.command,
- }
-
- let active = 0
- let max = 0
-
- Formatter.gofmt.extensions = [".parallel"]
- Formatter.mix.extensions = [".parallel"]
- Formatter.gofmt.command = ["sh", "-c", "true"]
- Formatter.mix.command = ["sh", "-c", "true"]
- Formatter.gofmt.enabled = async () => {
- active++
- max = Math.max(max, active)
- await Bun.sleep(20)
- active--
- return true
- }
- Formatter.mix.enabled = async () => {
- active++
- max = Math.max(max, active)
- await Bun.sleep(20)
- active--
- return true
- }
-
- try {
- await withServices(tmp.path, Format.layer, async (rt) => {
- await rt.runPromise(Format.Service.use((s) => s.init()))
- await Bus.publish(File.Event.Edited, { file })
+ ),
+ )
+
+ it.effect("service initializes without error", () =>
+ provideTmpdirInstance(() => Format.Service.use(() => Effect.void)),
+ )
+
+ it.effect("status() initializes formatter state per directory", () =>
+ Effect.gen(function* () {
+ const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), {
+ config: { formatter: false },
})
- } finally {
- Formatter.gofmt.extensions = one.extensions
- Formatter.gofmt.enabled = one.enabled
- Formatter.gofmt.command = one.command
- Formatter.mix.extensions = two.extensions
- Formatter.mix.enabled = two.enabled
- Formatter.mix.command = two.command
- }
-
- expect(max).toBe(2)
- })
-
- test("runs matching formatters sequentially for the same file", async () => {
- await using tmp = await tmpdir({
- config: {
- formatter: {
- first: {
- command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
- extensions: [".seq"],
- },
- second: {
- command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
- extensions: [".seq"],
+ const b = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()))
+
+ expect(a).toEqual([])
+ expect(b.length).toBeGreaterThan(0)
+ }),
+ )
+
+ it.effect("runs enabled checks for matching formatters in parallel", () =>
+ provideTmpdirInstance((path) =>
+ Effect.gen(function* () {
+ const file = `${path}/test.parallel`
+ yield* Effect.promise(() => Bun.write(file, "x"))
+
+ const one = {
+ extensions: Formatter.gofmt.extensions,
+ enabled: Formatter.gofmt.enabled,
+ command: Formatter.gofmt.command,
+ }
+ const two = {
+ extensions: Formatter.mix.extensions,
+ enabled: Formatter.mix.enabled,
+ command: Formatter.mix.command,
+ }
+
+ let active = 0
+ let max = 0
+
+ yield* Effect.acquireUseRelease(
+ Effect.sync(() => {
+ Formatter.gofmt.extensions = [".parallel"]
+ Formatter.mix.extensions = [".parallel"]
+ Formatter.gofmt.command = ["sh", "-c", "true"]
+ Formatter.mix.command = ["sh", "-c", "true"]
+ Formatter.gofmt.enabled = async () => {
+ active++
+ max = Math.max(max, active)
+ await Bun.sleep(20)
+ active--
+ return true
+ }
+ Formatter.mix.enabled = async () => {
+ active++
+ max = Math.max(max, active)
+ await Bun.sleep(20)
+ active--
+ return true
+ }
+ }),
+ () =>
+ Format.Service.use((fmt) =>
+ Effect.gen(function* () {
+ yield* fmt.init()
+ yield* fmt.file(file)
+ }),
+ ),
+ () =>
+ Effect.sync(() => {
+ Formatter.gofmt.extensions = one.extensions
+ Formatter.gofmt.enabled = one.enabled
+ Formatter.gofmt.command = one.command
+ Formatter.mix.extensions = two.extensions
+ Formatter.mix.enabled = two.enabled
+ Formatter.mix.command = two.command
+ }),
+ )
+
+ expect(max).toBe(2)
+ }),
+ ),
+ )
+
+ it.effect("runs matching formatters sequentially for the same file", () =>
+ provideTmpdirInstance(
+ (path) =>
+ Effect.gen(function* () {
+ const file = `${path}/test.seq`
+ yield* Effect.promise(() => Bun.write(file, "x"))
+
+ yield* Format.Service.use((fmt) =>
+ Effect.gen(function* () {
+ yield* fmt.init()
+ yield* fmt.file(file)
+ }),
+ )
+
+ expect(yield* Effect.promise(() => Bun.file(file).text())).toBe("xAB")
+ }),
+ {
+ config: {
+ formatter: {
+ first: {
+ command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
+ extensions: [".seq"],
+ },
+ second: {
+ command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
+ extensions: [".seq"],
+ },
},
},
},
- })
-
- const file = `${tmp.path}/test.seq`
- await Bun.write(file, "x")
-
- await withServices(tmp.path, Format.layer, async (rt) => {
- await rt.runPromise(Format.Service.use((s) => s.init()))
- await Bus.publish(File.Event.Edited, { file })
- })
-
- expect(await Bun.file(file).text()).toBe("xAB")
- })
+ ),
+ )
})
diff --git a/packages/opencode/test/project/vcs.test.ts b/packages/opencode/test/project/vcs.test.ts
index f55989caf..50282b5f6 100644
--- a/packages/opencode/test/project/vcs.test.ts
+++ b/packages/opencode/test/project/vcs.test.ts
@@ -2,9 +2,7 @@ import { $ } from "bun"
import { afterEach, describe, expect, test } from "bun:test"
import fs from "fs/promises"
import path from "path"
-import { Effect, Layer, ManagedRuntime } from "effect"
import { tmpdir } from "../fixture/fixture"
-import { watcherConfigLayer, withServices } from "../fixture/instance"
import { FileWatcher } from "../../src/file/watcher"
import { Instance } from "../../src/project/instance"
import { GlobalBus } from "../../src/bus/global"
@@ -17,28 +15,26 @@ const describeVcs = FileWatcher.hasNativeBinding() && !process.env.CI ? describe
// Helpers
// ---------------------------------------------------------------------------
-function withVcs(
- directory: string,
- body: (rt: ManagedRuntime.ManagedRuntime<FileWatcher.Service | Vcs.Service, never>) => Promise<void>,
-) {
- return withServices(
+async function withVcs(directory: string, body: () => Promise<void>) {
+ return Instance.provide({
directory,
- Layer.merge(FileWatcher.layer, Vcs.defaultLayer),
- async (rt) => {
- await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
- await rt.runPromise(Vcs.Service.use((s) => s.init()))
+ fn: async () => {
+ FileWatcher.init()
+ Vcs.init()
await Bun.sleep(500)
- await body(rt)
+ await body()
},
- { provide: [watcherConfigLayer] },
- )
+ })
}
-function withVcsOnly(
- directory: string,
- body: (rt: ManagedRuntime.ManagedRuntime<Vcs.Service, never>) => Promise<void>,
-) {
- return withServices(directory, Vcs.defaultLayer, body)
+function withVcsOnly(directory: string, body: () => Promise<void>) {
+ return Instance.provide({
+ directory,
+ fn: async () => {
+ Vcs.init()
+ await body()
+ },
+ })
}
type BranchEvent = { directory?: string; payload: { type: string; properties: { branch?: string } } }
@@ -82,8 +78,8 @@ describeVcs("Vcs", () => {
test("branch() returns current branch name", async () => {
await using tmp = await tmpdir({ git: true })
- await withVcs(tmp.path, async (rt) => {
- const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
+ await withVcs(tmp.path, async () => {
+ const branch = await Vcs.branch()
expect(branch).toBeDefined()
expect(typeof branch).toBe("string")
})
@@ -92,8 +88,8 @@ describeVcs("Vcs", () => {
test("branch() returns undefined for non-git directories", async () => {
await using tmp = await tmpdir()
- await withVcs(tmp.path, async (rt) => {
- const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
+ await withVcs(tmp.path, async () => {
+ const branch = await Vcs.branch()
expect(branch).toBeUndefined()
})
})
@@ -119,14 +115,14 @@ describeVcs("Vcs", () => {
const branch = `test-${Math.random().toString(36).slice(2)}`
await $`git branch ${branch}`.cwd(tmp.path).quiet()
- await withVcs(tmp.path, async (rt) => {
+ await withVcs(tmp.path, async () => {
const pending = nextBranchUpdate(tmp.path)
const head = path.join(tmp.path, ".git", "HEAD")
await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
await pending
- const current = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
+ const current = await Vcs.branch()
expect(current).toBe(branch)
})
})
@@ -141,8 +137,8 @@ describe("Vcs diff", () => {
await using tmp = await tmpdir({ git: true })
await $`git branch -M main`.cwd(tmp.path).quiet()
- await withVcsOnly(tmp.path, async (rt) => {
- const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch()))
+ await withVcsOnly(tmp.path, async () => {
+ const branch = await Vcs.defaultBranch()
expect(branch).toBe("main")
})
})
@@ -152,8 +148,8 @@ describe("Vcs diff", () => {
await $`git branch -M trunk`.cwd(tmp.path).quiet()
await $`git config init.defaultBranch trunk`.cwd(tmp.path).quiet()
- await withVcsOnly(tmp.path, async (rt) => {
- const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch()))
+ await withVcsOnly(tmp.path, async () => {
+ const branch = await Vcs.defaultBranch()
expect(branch).toBe("trunk")
})
})
@@ -165,10 +161,10 @@ describe("Vcs diff", () => {
const dir = path.join(wt.path, "feature")
await $`git worktree add -b feature/test ${dir} HEAD`.cwd(tmp.path).quiet()
- await withVcsOnly(dir, async (rt) => {
+ await withVcsOnly(dir, async () => {
const [branch, base] = await Promise.all([
- rt.runPromise(Vcs.Service.use((s) => s.branch())),
- rt.runPromise(Vcs.Service.use((s) => s.defaultBranch())),
+ Vcs.branch(),
+ Vcs.defaultBranch(),
])
expect(branch).toBe("feature/test")
expect(base).toBe("main")
@@ -182,8 +178,8 @@ describe("Vcs diff", () => {
await $`git commit --no-gpg-sign -m "add file"`.cwd(tmp.path).quiet()
await fs.writeFile(path.join(tmp.path, "file.txt"), "changed\n", "utf-8")
- await withVcsOnly(tmp.path, async (rt) => {
- const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git")))
+ await withVcsOnly(tmp.path, async () => {
+ const diff = await Vcs.diff("git")
expect(diff).toEqual(
expect.arrayContaining([
expect.objectContaining({
@@ -199,8 +195,8 @@ describe("Vcs diff", () => {
await using tmp = await tmpdir({ git: true })
await fs.writeFile(path.join(tmp.path, weird), "hello\n", "utf-8")
- await withVcsOnly(tmp.path, async (rt) => {
- const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git")))
+ await withVcsOnly(tmp.path, async () => {
+ const diff = await Vcs.diff("git")
expect(diff).toEqual(
expect.arrayContaining([
expect.objectContaining({
@@ -220,8 +216,8 @@ describe("Vcs diff", () => {
await $`git add .`.cwd(tmp.path).quiet()
await $`git commit --no-gpg-sign -m "branch file"`.cwd(tmp.path).quiet()
- await withVcsOnly(tmp.path, async (rt) => {
- const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("branch")))
+ await withVcsOnly(tmp.path, async () => {
+ const diff = await Vcs.diff("branch")
expect(diff).toEqual(
expect.arrayContaining([
expect.objectContaining({
diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts
index f96750d7d..5304f4ea8 100644
--- a/packages/opencode/test/sync/index.test.ts
+++ b/packages/opencode/test/sync/index.test.ts
@@ -110,10 +110,16 @@ describe("SyncEvent", () => {
type: string
properties: { id: string; name: string }
}> = []
- const unsub = Bus.subscribeAll((event) => events.push(event))
+ const received = new Promise<void>((resolve) => {
+ Bus.subscribeAll((event) => {
+ events.push(event)
+ resolve()
+ })
+ })
SyncEvent.run(Created, { id: "evt_1", name: "test" })
+ await received
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
@@ -122,8 +128,6 @@ describe("SyncEvent", () => {
name: "test",
},
})
-
- unsub()
}),
)
})
diff --git a/packages/opencode/test/tool/edit.test.ts b/packages/opencode/test/tool/edit.test.ts
index f6b1ee5c9..96d41400e 100644
--- a/packages/opencode/test/tool/edit.test.ts
+++ b/packages/opencode/test/tool/edit.test.ts
@@ -89,7 +89,6 @@ describe("tool.edit", () => {
const { FileWatcher } = await import("../../src/file/watcher")
const events: string[] = []
- const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
const edit = await EditTool.init()
@@ -102,9 +101,7 @@ describe("tool.edit", () => {
ctx,
)
- expect(events).toContain("edited")
expect(events).toContain("updated")
- unsubEdited()
unsubUpdated()
},
})
@@ -305,11 +302,9 @@ describe("tool.edit", () => {
await FileTime.read(ctx.sessionID, filepath)
const { Bus } = await import("../../src/bus")
- const { File } = await import("../../src/file")
const { FileWatcher } = await import("../../src/file/watcher")
const events: string[] = []
- const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
const edit = await EditTool.init()
@@ -322,9 +317,7 @@ describe("tool.edit", () => {
ctx,
)
- expect(events).toContain("edited")
expect(events).toContain("updated")
- unsubEdited()
unsubUpdated()
},
})