summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-03-16 13:18:40 -0400
committerGitHub <[email protected]>2026-03-16 13:18:40 -0400
commit9e740d9947e6a4c61680c8dd00cb1fd11adf12af (patch)
tree4cc2831664715b8737f14231094dc7a986f1372f
parentd4694d058cc590b0f05261a04460034d2fa8541d (diff)
downloadopencode-9e740d9947e6a4c61680c8dd00cb1fd11adf12af.tar.gz
opencode-9e740d9947e6a4c61680c8dd00cb1fd11adf12af.zip
stack: effectify-file-watcher-service (#17827)
-rw-r--r--packages/opencode/AGENTS.md35
-rw-r--r--packages/opencode/src/effect/instances.ts4
-rw-r--r--packages/opencode/src/file/watcher.ts190
-rw-r--r--packages/opencode/src/flag/flag.ts10
-rw-r--r--packages/opencode/src/project/bootstrap.ts5
-rw-r--r--packages/opencode/src/project/instance.ts9
-rw-r--r--packages/opencode/src/pty/index.ts62
-rw-r--r--packages/opencode/test/file/watcher.test.ts250
-rw-r--r--packages/opencode/test/permission/next.test.ts24
-rw-r--r--packages/opencode/test/pty/pty-session.test.ts10
10 files changed, 460 insertions, 139 deletions
diff --git a/packages/opencode/AGENTS.md b/packages/opencode/AGENTS.md
index 930297baa..f28150622 100644
--- a/packages/opencode/AGENTS.md
+++ b/packages/opencode/AGENTS.md
@@ -34,6 +34,7 @@ Instructions to follow when writing Effect.
- Use `Effect.gen(function* () { ... })` for composition.
- Use `Effect.fn("ServiceName.method")` for named/traced effects and `Effect.fnUntraced` for internal helpers.
- `Effect.fn` / `Effect.fnUntraced` accept pipeable operators as extra arguments, so avoid unnecessary `flow` or outer `.pipe()` wrappers.
+- **`Effect.callback`** (not `Effect.async`) for callback-based APIs. The classic `Effect.async` was renamed to `Effect.callback` in effect-smol/v4.
## Time
@@ -42,3 +43,37 @@ Instructions to follow when writing Effect.
## Errors
- In `Effect.gen/fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
+
+## Instance-scoped Effect services
+
+Services that need per-directory lifecycle (created/destroyed per instance) go through the `Instances` LayerMap:
+
+1. Define a `ServiceMap.Service` with a `static readonly layer` (see `FileWatcherService`, `QuestionService`, `PermissionService`, `ProviderAuthService`).
+2. Add it to `InstanceServices` union and `Layer.mergeAll(...)` in `src/effect/instances.ts`.
+3. Use `InstanceContext` inside the layer to read `directory` and `project` instead of `Instance.*` globals.
+4. Call from legacy code via `runPromiseInstance(MyService.use((s) => s.method()))`.
+
+### Instance.bind — ALS context for native callbacks
+
+`Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and returns a wrapper that restores it synchronously when called.
+
+**Use it** when passing callbacks to native C/C++ addons (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
+
+**Don't need it** for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers — Node.js ALS propagates through those automatically.
+
+```typescript
+// Native addon callback — needs Instance.bind
+const cb = Instance.bind((err, evts) => {
+ Bus.publish(MyEvent, { ... })
+})
+nativeAddon.subscribe(dir, cb)
+```
+
+## Flag → Effect.Config migration
+
+Flags in `src/flag/flag.ts` are being migrated from static `truthy(...)` reads to `Config.boolean(...).pipe(Config.withDefault(false))` as their consumers get effectified.
+
+- Effectful flags return `Config<boolean>` and are read with `yield*` inside `Effect.gen`.
+- The default `ConfigProvider` reads from `process.env`, so env vars keep working.
+- Tests can override via `ConfigProvider.layer(ConfigProvider.fromUnknown({ ... }))`.
+- Keep all flags in `flag.ts` as the single registry — just change the implementation from `truthy()` to `Config.boolean()` when the consumer moves to Effect.
diff --git a/packages/opencode/src/effect/instances.ts b/packages/opencode/src/effect/instances.ts
index 02d4bf482..d60d79355 100644
--- a/packages/opencode/src/effect/instances.ts
+++ b/packages/opencode/src/effect/instances.ts
@@ -3,6 +3,7 @@ import { registerDisposer } from "./instance-registry"
import { ProviderAuthService } from "@/provider/auth-service"
import { QuestionService } from "@/question/service"
import { PermissionService } from "@/permission/service"
+import { FileWatcherService } from "@/file/watcher"
import { Instance } from "@/project/instance"
import type { Project } from "@/project/project"
@@ -17,7 +18,7 @@ export class InstanceContext extends ServiceMap.Service<InstanceContext, Instanc
"opencode/InstanceContext",
) {}
-export type InstanceServices = QuestionService | PermissionService | ProviderAuthService
+export type InstanceServices = QuestionService | PermissionService | ProviderAuthService | FileWatcherService
function lookup(directory: string) {
const project = Instance.project
@@ -26,6 +27,7 @@ function lookup(directory: string) {
Layer.fresh(QuestionService.layer),
Layer.fresh(PermissionService.layer),
Layer.fresh(ProviderAuthService.layer),
+ Layer.fresh(FileWatcherService.layer),
).pipe(Layer.provide(ctx))
}
diff --git a/packages/opencode/src/file/watcher.ts b/packages/opencode/src/file/watcher.ts
index 3797c1627..651f15f84 100644
--- a/packages/opencode/src/file/watcher.ts
+++ b/packages/opencode/src/file/watcher.ts
@@ -1,7 +1,8 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
+import { InstanceContext } from "@/effect/instances"
+import { Instance } from "@/project/instance"
import z from "zod"
-import { Instance } from "../project/instance"
import { Log } from "../util/log"
import { FileIgnore } from "./ignore"
import { Config } from "../config/config"
@@ -9,118 +10,139 @@ import path from "path"
// @ts-ignore
import { createWrapper } from "@parcel/watcher/wrapper"
import { lazy } from "@/util/lazy"
-import { withTimeout } from "@/util/timeout"
import type ParcelWatcher from "@parcel/watcher"
-import { Flag } from "@/flag/flag"
import { readdir } from "fs/promises"
import { git } from "@/util/git"
import { Protected } from "./protected"
+import { Flag } from "@/flag/flag"
+import { Cause, Effect, Layer, ServiceMap } from "effect"
const SUBSCRIBE_TIMEOUT_MS = 10_000
declare const OPENCODE_LIBC: string | undefined
+const log = Log.create({ service: "file.watcher" })
+
+const event = {
+ Updated: BusEvent.define(
+ "file.watcher.updated",
+ z.object({
+ file: z.string(),
+ event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
+ }),
+ ),
+}
+
+const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
+ try {
+ const binding = require(
+ `@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
+ )
+ return createWrapper(binding) as typeof import("@parcel/watcher")
+ } catch (error) {
+ log.error("failed to load watcher binding", { error })
+ return
+ }
+})
+
+function getBackend() {
+ if (process.platform === "win32") return "windows"
+ if (process.platform === "darwin") return "fs-events"
+ if (process.platform === "linux") return "inotify"
+}
+
export namespace FileWatcher {
- const log = Log.create({ service: "file.watcher" })
-
- export const Event = {
- Updated: BusEvent.define(
- "file.watcher.updated",
- z.object({
- file: z.string(),
- event: z.union([z.literal("add"), z.literal("change"), z.literal("unlink")]),
- }),
- ),
+ export const Event = event
+ /** Whether the native @parcel/watcher binding is available on this platform. */
+ export const hasNativeBinding = () => !!watcher()
+}
+
+const init = Effect.fn("FileWatcherService.init")(function* () {})
+
+export namespace FileWatcherService {
+ export interface Service {
+ readonly init: () => Effect.Effect<void>
}
+}
+
+export class FileWatcherService extends ServiceMap.Service<FileWatcherService, FileWatcherService.Service>()(
+ "@opencode/FileWatcher",
+) {
+ static readonly layer = Layer.effect(
+ FileWatcherService,
+ Effect.gen(function* () {
+ const instance = yield* InstanceContext
+ if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return FileWatcherService.of({ init })
- const watcher = lazy((): typeof import("@parcel/watcher") | undefined => {
- try {
- const binding = require(
- `@parcel/watcher-${process.platform}-${process.arch}${process.platform === "linux" ? `-${OPENCODE_LIBC || "glibc"}` : ""}`,
- )
- return createWrapper(binding) as typeof import("@parcel/watcher")
- } catch (error) {
- log.error("failed to load watcher binding", { error })
- return
- }
- })
-
- const state = Instance.state(
- async () => {
- log.info("init")
- const cfg = await Config.get()
- const backend = (() => {
- if (process.platform === "win32") return "windows"
- if (process.platform === "darwin") return "fs-events"
- if (process.platform === "linux") return "inotify"
- })()
+ log.info("init", { directory: instance.directory })
+
+ const backend = getBackend()
if (!backend) {
- log.error("watcher backend not supported", { platform: process.platform })
- return {}
+ log.error("watcher backend not supported", { directory: instance.directory, platform: process.platform })
+ return FileWatcherService.of({ init })
}
- log.info("watcher backend", { platform: process.platform, backend })
const w = watcher()
- if (!w) return {}
+ if (!w) return FileWatcherService.of({ init })
+
+ log.info("watcher backend", { directory: instance.directory, platform: process.platform, backend })
- const subscribe: ParcelWatcher.SubscribeCallback = (err, evts) => {
+ const subs: ParcelWatcher.AsyncSubscription[] = []
+ yield* Effect.addFinalizer(() => Effect.promise(() => Promise.allSettled(subs.map((sub) => sub.unsubscribe()))))
+
+ const cb: ParcelWatcher.SubscribeCallback = Instance.bind((err, evts) => {
if (err) return
for (const evt of evts) {
- if (evt.type === "create") Bus.publish(Event.Updated, { file: evt.path, event: "add" })
- if (evt.type === "update") Bus.publish(Event.Updated, { file: evt.path, event: "change" })
- if (evt.type === "delete") Bus.publish(Event.Updated, { file: evt.path, event: "unlink" })
+ if (evt.type === "create") Bus.publish(event.Updated, { file: evt.path, event: "add" })
+ if (evt.type === "update") Bus.publish(event.Updated, { file: evt.path, event: "change" })
+ if (evt.type === "delete") Bus.publish(event.Updated, { file: evt.path, event: "unlink" })
}
+ })
+
+ const subscribe = (dir: string, ignore: string[]) => {
+ const pending = w.subscribe(dir, cb, { ignore, backend })
+ return Effect.gen(function* () {
+ const sub = yield* Effect.promise(() => pending)
+ subs.push(sub)
+ }).pipe(
+ Effect.timeout(SUBSCRIBE_TIMEOUT_MS),
+ Effect.catchCause((cause) => {
+ log.error("failed to subscribe", { dir, cause: Cause.pretty(cause) })
+ // Clean up a subscription that resolves after timeout
+ pending.then((s) => s.unsubscribe()).catch(() => {})
+ return Effect.void
+ }),
+ )
}
- const subs: ParcelWatcher.AsyncSubscription[] = []
+ const cfg = yield* Effect.promise(() => Config.get())
const cfgIgnores = cfg.watcher?.ignore ?? []
- if (Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
- const pending = w.subscribe(Instance.directory, subscribe, {
- ignore: [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()],
- backend,
- })
- const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
- log.error("failed to subscribe to Instance.directory", { error: err })
- pending.then((s) => s.unsubscribe()).catch(() => {})
- return undefined
- })
- if (sub) subs.push(sub)
+ if (yield* Flag.OPENCODE_EXPERIMENTAL_FILEWATCHER) {
+ yield* subscribe(instance.directory, [...FileIgnore.PATTERNS, ...cfgIgnores, ...Protected.paths()])
}
- if (Instance.project.vcs === "git") {
- const result = await git(["rev-parse", "--git-dir"], {
- cwd: Instance.worktree,
- })
- const vcsDir = result.exitCode === 0 ? path.resolve(Instance.worktree, result.text().trim()) : undefined
+ if (instance.project.vcs === "git") {
+ const result = yield* Effect.promise(() =>
+ git(["rev-parse", "--git-dir"], {
+ cwd: instance.project.worktree,
+ }),
+ )
+ const vcsDir = result.exitCode === 0 ? path.resolve(instance.project.worktree, result.text().trim()) : undefined
if (vcsDir && !cfgIgnores.includes(".git") && !cfgIgnores.includes(vcsDir)) {
- const gitDirContents = await readdir(vcsDir).catch(() => [])
- const ignoreList = gitDirContents.filter((entry) => entry !== "HEAD")
- const pending = w.subscribe(vcsDir, subscribe, {
- ignore: ignoreList,
- backend,
- })
- const sub = await withTimeout(pending, SUBSCRIBE_TIMEOUT_MS).catch((err) => {
- log.error("failed to subscribe to vcsDir", { error: err })
- pending.then((s) => s.unsubscribe()).catch(() => {})
- return undefined
- })
- if (sub) subs.push(sub)
+ const ignore = (yield* Effect.promise(() => readdir(vcsDir).catch(() => []))).filter(
+ (entry) => entry !== "HEAD",
+ )
+ yield* subscribe(vcsDir, ignore)
}
}
- return { subs }
- },
- async (state) => {
- if (!state.subs) return
- await Promise.all(state.subs.map((sub) => sub?.unsubscribe()))
- },
+ return FileWatcherService.of({ init })
+ }).pipe(
+ Effect.catchCause((cause) => {
+ log.error("failed to init watcher service", { cause: Cause.pretty(cause) })
+ return Effect.succeed(FileWatcherService.of({ init }))
+ }),
+ ),
)
-
- export function init() {
- if (Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) {
- return
- }
- state()
- }
}
diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts
index f1688a1b4..a1cfd862b 100644
--- a/packages/opencode/src/flag/flag.ts
+++ b/packages/opencode/src/flag/flag.ts
@@ -1,3 +1,5 @@
+import { Config } from "effect"
+
function truthy(key: string) {
const value = process.env[key]?.toLowerCase()
return value === "true" || value === "1"
@@ -40,8 +42,12 @@ export namespace Flag {
// Experimental
export const OPENCODE_EXPERIMENTAL = truthy("OPENCODE_EXPERIMENTAL")
- export const OPENCODE_EXPERIMENTAL_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_FILEWATCHER")
- export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = truthy("OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER")
+ export const OPENCODE_EXPERIMENTAL_FILEWATCHER = Config.boolean("OPENCODE_EXPERIMENTAL_FILEWATCHER").pipe(
+ Config.withDefault(false),
+ )
+ export const OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER = Config.boolean(
+ "OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER",
+ ).pipe(Config.withDefault(false))
export const OPENCODE_EXPERIMENTAL_ICON_DISCOVERY =
OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_ICON_DISCOVERY")
diff --git a/packages/opencode/src/project/bootstrap.ts b/packages/opencode/src/project/bootstrap.ts
index a2be3733f..bd819dc28 100644
--- a/packages/opencode/src/project/bootstrap.ts
+++ b/packages/opencode/src/project/bootstrap.ts
@@ -1,7 +1,7 @@
import { Plugin } from "../plugin"
import { Format } from "../format"
import { LSP } from "../lsp"
-import { FileWatcher } from "../file/watcher"
+import { FileWatcherService } from "../file/watcher"
import { File } from "../file"
import { Project } from "./project"
import { Bus } from "../bus"
@@ -12,6 +12,7 @@ import { Log } from "@/util/log"
import { ShareNext } from "@/share/share-next"
import { Snapshot } from "../snapshot"
import { Truncate } from "../tool/truncation"
+import { runPromiseInstance } from "@/effect/runtime"
export async function InstanceBootstrap() {
Log.Default.info("bootstrapping", { directory: Instance.directory })
@@ -19,7 +20,7 @@ export async function InstanceBootstrap() {
ShareNext.init()
Format.init()
await LSP.init()
- FileWatcher.init()
+ await runPromiseInstance(FileWatcherService.use((service) => service.init()))
File.init()
Vcs.init()
Snapshot.init()
diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts
index fd3cc640a..c16801a7a 100644
--- a/packages/opencode/src/project/instance.ts
+++ b/packages/opencode/src/project/instance.ts
@@ -101,6 +101,15 @@ export const Instance = {
if (Instance.worktree === "/") return false
return Filesystem.contains(Instance.worktree, filepath)
},
+ /**
+ * Captures the current instance ALS context and returns a wrapper that
+ * restores it when called. Use this for callbacks that fire outside the
+ * instance async context (native addons, event emitters, timers, etc.).
+ */
+ bind<F extends (...args: any[]) => any>(fn: F): F {
+ const ctx = context.use()
+ return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F
+ },
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
return State.create(() => Instance.directory, init, dispose)
},
diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts
index d6bc4973a..7436abec9 100644
--- a/packages/opencode/src/pty/index.ts
+++ b/packages/opencode/src/pty/index.ts
@@ -167,40 +167,44 @@ export namespace Pty {
subscribers: new Map(),
}
state().set(id, session)
- ptyProcess.onData((chunk) => {
- session.cursor += chunk.length
+ ptyProcess.onData(
+ Instance.bind((chunk) => {
+ session.cursor += chunk.length
- for (const [key, ws] of session.subscribers.entries()) {
- if (ws.readyState !== 1) {
- session.subscribers.delete(key)
- continue
- }
+ for (const [key, ws] of session.subscribers.entries()) {
+ if (ws.readyState !== 1) {
+ session.subscribers.delete(key)
+ continue
+ }
- if (ws.data !== key) {
- session.subscribers.delete(key)
- continue
- }
+ if (ws.data !== key) {
+ session.subscribers.delete(key)
+ continue
+ }
- try {
- ws.send(chunk)
- } catch {
- session.subscribers.delete(key)
+ try {
+ ws.send(chunk)
+ } catch {
+ session.subscribers.delete(key)
+ }
}
- }
- session.buffer += chunk
- if (session.buffer.length <= BUFFER_LIMIT) return
- const excess = session.buffer.length - BUFFER_LIMIT
- session.buffer = session.buffer.slice(excess)
- session.bufferCursor += excess
- })
- ptyProcess.onExit(({ exitCode }) => {
- if (session.info.status === "exited") return
- log.info("session exited", { id, exitCode })
- session.info.status = "exited"
- Bus.publish(Event.Exited, { id, exitCode })
- remove(id)
- })
+ session.buffer += chunk
+ if (session.buffer.length <= BUFFER_LIMIT) return
+ const excess = session.buffer.length - BUFFER_LIMIT
+ session.buffer = session.buffer.slice(excess)
+ session.bufferCursor += excess
+ }),
+ )
+ ptyProcess.onExit(
+ Instance.bind(({ exitCode }) => {
+ if (session.info.status === "exited") return
+ log.info("session exited", { id, exitCode })
+ session.info.status = "exited"
+ Bus.publish(Event.Exited, { id, exitCode })
+ remove(id)
+ }),
+ )
Bus.publish(Event.Created, { info })
return info
}
diff --git a/packages/opencode/test/file/watcher.test.ts b/packages/opencode/test/file/watcher.test.ts
new file mode 100644
index 000000000..7fe53612d
--- /dev/null
+++ b/packages/opencode/test/file/watcher.test.ts
@@ -0,0 +1,250 @@
+import { $ } from "bun"
+import { afterEach, describe, expect, test } from "bun:test"
+import fs from "fs/promises"
+import path from "path"
+import { ConfigProvider, Deferred, Effect, Fiber, Layer, ManagedRuntime, Option } from "effect"
+import { tmpdir } from "../fixture/fixture"
+import { FileWatcher, FileWatcherService } from "../../src/file/watcher"
+import { InstanceContext } from "../../src/effect/instances"
+import { Instance } from "../../src/project/instance"
+import { GlobalBus } from "../../src/bus/global"
+
+// Native @parcel/watcher bindings aren't reliably available in CI (missing on Linux, flaky on Windows)
+const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? describe : describe.skip
+
+// ---------------------------------------------------------------------------
+// Helpers
+// ---------------------------------------------------------------------------
+
+const configLayer = ConfigProvider.layer(
+ ConfigProvider.fromUnknown({
+ OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
+ OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
+ }),
+)
+
+type BusUpdate = { directory?: string; payload: { type: string; properties: WatcherEvent } }
+type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
+
+/** Run `body` with a live FileWatcherService. Runtime is acquired/released via Effect.scoped. */
+function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
+ return Instance.provide({
+ directory,
+ fn: () =>
+ Effect.gen(function* () {
+ const ctx = Layer.sync(InstanceContext, () =>
+ InstanceContext.of({ directory: Instance.directory, project: Instance.project }),
+ )
+ const layer = Layer.fresh(FileWatcherService.layer).pipe(Layer.provide(ctx), Layer.provide(configLayer))
+ const rt = yield* Effect.acquireRelease(
+ Effect.sync(() => ManagedRuntime.make(layer)),
+ (rt) => Effect.promise(() => rt.dispose()),
+ )
+ yield* Effect.promise(() => rt.runPromise(FileWatcherService.use((s) => s.init())))
+ yield* ready(directory)
+ yield* body
+ }).pipe(Effect.scoped, Effect.runPromise),
+ })
+}
+
+function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {
+ let done = false
+
+ function on(evt: BusUpdate) {
+ if (done) return
+ if (evt.directory !== directory) return
+ if (evt.payload.type !== FileWatcher.Event.Updated.type) return
+ if (!check(evt.payload.properties)) return
+ hit(evt.payload.properties)
+ }
+
+ function cleanup() {
+ if (done) return
+ done = true
+ GlobalBus.off("event", on)
+ }
+
+ GlobalBus.on("event", on)
+ return cleanup
+}
+
+function wait(directory: string, check: (evt: WatcherEvent) => boolean) {
+ return Effect.callback<WatcherEvent>((resume) => {
+ const cleanup = listen(directory, check, (evt) => {
+ cleanup()
+ resume(Effect.succeed(evt))
+ })
+ return Effect.sync(cleanup)
+ }).pipe(Effect.timeout("5 seconds"))
+}
+
+function nextUpdate<E>(directory: string, check: (evt: WatcherEvent) => boolean, trigger: Effect.Effect<void, E>) {
+ return Effect.acquireUseRelease(
+ wait(directory, check).pipe(Effect.forkChild({ startImmediately: true })),
+ (fiber) =>
+ Effect.gen(function* () {
+ yield* trigger
+ return yield* Fiber.join(fiber)
+ }),
+ Fiber.interrupt,
+ )
+}
+
+/** Effect that asserts no matching event arrives within `ms`. */
+function noUpdate<E>(
+ directory: string,
+ check: (evt: WatcherEvent) => boolean,
+ trigger: Effect.Effect<void, E>,
+ ms = 500,
+) {
+ return Effect.gen(function* () {
+ const deferred = yield* Deferred.make<WatcherEvent>()
+
+ yield* Effect.acquireUseRelease(
+ Effect.sync(() =>
+ listen(directory, check, (evt) => {
+ Effect.runSync(Deferred.succeed(deferred, evt))
+ }),
+ ),
+ () =>
+ Effect.gen(function* () {
+ yield* trigger
+ expect(yield* Deferred.await(deferred).pipe(Effect.timeoutOption(`${ms} millis`))).toEqual(Option.none())
+ }),
+ (cleanup) => Effect.sync(cleanup),
+ )
+ })
+}
+
+function ready(directory: string) {
+ const file = path.join(directory, `.watcher-${Math.random().toString(36).slice(2)}`)
+ const head = path.join(directory, ".git", "HEAD")
+
+ return Effect.gen(function* () {
+ yield* nextUpdate(
+ directory,
+ (evt) => evt.file === file && evt.event === "add",
+ Effect.promise(() => fs.writeFile(file, "ready")),
+ ).pipe(Effect.ensuring(Effect.promise(() => fs.rm(file, { force: true }).catch(() => undefined))), Effect.asVoid)
+
+ const git = yield* Effect.promise(() =>
+ fs
+ .stat(head)
+ .then(() => true)
+ .catch(() => false),
+ )
+ if (!git) return
+
+ const branch = `watch-${Math.random().toString(36).slice(2)}`
+ const hash = yield* Effect.promise(() => $`git rev-parse HEAD`.cwd(directory).quiet().text())
+ yield* nextUpdate(
+ directory,
+ (evt) => evt.file === head && evt.event !== "unlink",
+ Effect.promise(async () => {
+ await fs.writeFile(path.join(directory, ".git", "refs", "heads", branch), hash.trim() + "\n")
+ await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
+ }),
+ ).pipe(Effect.asVoid)
+ })
+}
+
+// ---------------------------------------------------------------------------
+// Tests
+// ---------------------------------------------------------------------------
+
+describeWatcher("FileWatcherService", () => {
+ afterEach(() => Instance.disposeAll())
+
+ test("publishes root create, update, and delete events", async () => {
+ await using tmp = await tmpdir({ git: true })
+ const file = path.join(tmp.path, "watch.txt")
+ const dir = tmp.path
+ const cases = [
+ { event: "add" as const, trigger: Effect.promise(() => fs.writeFile(file, "a")) },
+ { event: "change" as const, trigger: Effect.promise(() => fs.writeFile(file, "b")) },
+ { event: "unlink" as const, trigger: Effect.promise(() => fs.unlink(file)) },
+ ]
+
+ await withWatcher(
+ dir,
+ Effect.forEach(cases, ({ event, trigger }) =>
+ nextUpdate(dir, (evt) => evt.file === file && evt.event === event, trigger).pipe(
+ Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event }))),
+ ),
+ ),
+ )
+ })
+
+ test("watches non-git roots", async () => {
+ await using tmp = await tmpdir()
+ const file = path.join(tmp.path, "plain.txt")
+ const dir = tmp.path
+
+ await withWatcher(
+ dir,
+ nextUpdate(
+ dir,
+ (e) => e.file === file && e.event === "add",
+ Effect.promise(() => fs.writeFile(file, "plain")),
+ ).pipe(Effect.tap((evt) => Effect.sync(() => expect(evt).toEqual({ file, event: "add" })))),
+ )
+ })
+
+ test("cleanup stops publishing events", async () => {
+ await using tmp = await tmpdir({ git: true })
+ const file = path.join(tmp.path, "after-dispose.txt")
+
+ // Start and immediately stop the watcher (withWatcher disposes on exit)
+ await withWatcher(tmp.path, Effect.void)
+
+ // Now write a file — no watcher should be listening
+ await Effect.runPromise(
+ noUpdate(
+ tmp.path,
+ (e) => e.file === file,
+ Effect.promise(() => fs.writeFile(file, "gone")),
+ ),
+ )
+ })
+
+ test("ignores .git/index changes", async () => {
+ await using tmp = await tmpdir({ git: true })
+ const gitIndex = path.join(tmp.path, ".git", "index")
+ const edit = path.join(tmp.path, "tracked.txt")
+
+ await withWatcher(
+ tmp.path,
+ noUpdate(
+ tmp.path,
+ (e) => e.file === gitIndex,
+ Effect.promise(async () => {
+ await fs.writeFile(edit, "a")
+ await $`git add .`.cwd(tmp.path).quiet().nothrow()
+ }),
+ ),
+ )
+ })
+
+ test("publishes .git/HEAD events", async () => {
+ await using tmp = await tmpdir({ git: true })
+ const head = path.join(tmp.path, ".git", "HEAD")
+ const branch = `watch-${Math.random().toString(36).slice(2)}`
+ await $`git branch ${branch}`.cwd(tmp.path).quiet()
+
+ await withWatcher(
+ tmp.path,
+ nextUpdate(
+ tmp.path,
+ (evt) => evt.file === head && evt.event !== "unlink",
+ Effect.promise(() => fs.writeFile(head, `ref: refs/heads/${branch}\n`)),
+ ).pipe(
+ Effect.tap((evt) =>
+ Effect.sync(() => {
+ expect(evt.file).toBe(head)
+ expect(["add", "change"]).toContain(evt.event)
+ }),
+ ),
+ ),
+ )
+ })
+})
diff --git a/packages/opencode/test/permission/next.test.ts b/packages/opencode/test/permission/next.test.ts
index 2e9195c28..7f7e5e1f1 100644
--- a/packages/opencode/test/permission/next.test.ts
+++ b/packages/opencode/test/permission/next.test.ts
@@ -977,7 +977,7 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
await Instance.provide({
directory: tmp.path,
fn: async () => {
- const ask = PermissionNext.ask({
+ const err = await PermissionNext.ask({
sessionID: SessionID.make("session_test"),
permission: "bash",
patterns: ["echo hello", "rm -rf /"],
@@ -987,24 +987,12 @@ test("ask - should deny even when an earlier pattern is ask", async () => {
{ permission: "bash", pattern: "echo *", action: "ask" },
{ permission: "bash", pattern: "rm *", action: "deny" },
],
- })
-
- const out = await Promise.race([
- ask.then(
- () => ({ ok: true as const, err: undefined }),
- (err) => ({ ok: false as const, err }),
- ),
- Bun.sleep(100).then(() => "timeout" as const),
- ])
-
- if (out === "timeout") {
- await rejectAll()
- await ask.catch(() => {})
- throw new Error("ask timed out instead of denying immediately")
- }
+ }).then(
+ () => undefined,
+ (err) => err,
+ )
- expect(out.ok).toBe(false)
- expect(out.err).toBeInstanceOf(PermissionNext.DeniedError)
+ expect(err).toBeInstanceOf(PermissionNext.DeniedError)
expect(await PermissionNext.list()).toHaveLength(0)
},
})
diff --git a/packages/opencode/test/pty/pty-session.test.ts b/packages/opencode/test/pty/pty-session.test.ts
index 9063af872..f7a949c92 100644
--- a/packages/opencode/test/pty/pty-session.test.ts
+++ b/packages/opencode/test/pty/pty-session.test.ts
@@ -6,7 +6,7 @@ import type { PtyID } from "../../src/pty/schema"
import { tmpdir } from "../fixture/fixture"
import { setTimeout as sleep } from "node:timers/promises"
-const wait = async (fn: () => boolean, ms = 2000) => {
+const wait = async (fn: () => boolean, ms = 5000) => {
const end = Date.now() + ms
while (Date.now() < end) {
if (fn()) return
@@ -20,7 +20,7 @@ const pick = (log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }>,
}
describe("pty", () => {
- test("publishes created, exited, deleted in order for /bin/ls + remove", async () => {
+ test("publishes created, exited, deleted in order for a short-lived process", async () => {
if (process.platform === "win32") return
await using dir = await tmpdir({ git: true })
@@ -37,7 +37,11 @@ describe("pty", () => {
let id: PtyID | undefined
try {
- const info = await Pty.create({ command: "/bin/ls", title: "ls" })
+ const info = await Pty.create({
+ command: "/usr/bin/env",
+ args: ["sh", "-c", "sleep 0.1"],
+ title: "sleep",
+ })
id = info.id
await wait(() => pick(log, id!).includes("exited"))