summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-03-31 19:14:32 -0400
committerGitHub <[email protected]>2026-03-31 19:14:32 -0400
commit0c03a3ee10f6462a25f67fc847188b74ae76b42b (patch)
treefeee9d908a199fdaaf8955423269ec58b89a99d4
parent53330a518f44ca372e9706e7261d967e18ef2cda (diff)
downloadopencode-0c03a3ee10f6462a25f67fc847188b74ae76b42b.tar.gz
opencode-0c03a3ee10f6462a25f67fc847188b74ae76b42b.zip
test: migrate prompt tests to HTTP mock LLM server (#20304)
-rw-r--r--packages/opencode/src/bus/index.ts3
-rw-r--r--packages/opencode/src/config/config.ts3
-rw-r--r--packages/opencode/src/effect/instance-ref.ts6
-rw-r--r--packages/opencode/src/effect/instance-state.ts45
-rw-r--r--packages/opencode/src/effect/run-service.ts24
-rw-r--r--packages/opencode/src/format/index.ts3
-rw-r--r--packages/opencode/src/installation/index.ts10
-rw-r--r--packages/opencode/src/installation/meta.ts7
-rw-r--r--packages/opencode/src/project/instance.ts8
-rw-r--r--packages/opencode/src/session/compaction.ts6
-rw-r--r--packages/opencode/src/session/index.ts14
-rw-r--r--packages/opencode/src/session/prompt.ts18
-rw-r--r--packages/opencode/src/storage/db.ts21
-rw-r--r--packages/opencode/src/worktree/index.ts17
-rw-r--r--packages/opencode/test/account/repo.test.ts24
-rw-r--r--packages/opencode/test/account/service.test.ts12
-rw-r--r--packages/opencode/test/bus/bus-effect.test.ts10
-rw-r--r--packages/opencode/test/effect/instance-state.test.ts100
-rw-r--r--packages/opencode/test/effect/runner.test.ts48
-rw-r--r--packages/opencode/test/fixture/fixture.ts45
-rw-r--r--packages/opencode/test/format/format.test.ts14
-rw-r--r--packages/opencode/test/lib/effect.ts34
-rw-r--r--packages/opencode/test/lib/llm-server.ts282
-rw-r--r--packages/opencode/test/session/processor-effect.test.ts20
-rw-r--r--packages/opencode/test/session/prompt-effect.test.ts1057
-rw-r--r--packages/opencode/test/tool/truncation.test.ts2
-rw-r--r--packages/sdk/js/src/v2/gen/types.gen.ts30
27 files changed, 1146 insertions, 717 deletions
diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts
index db6327c82..2a841920d 100644
--- a/packages/opencode/src/bus/index.ts
+++ b/packages/opencode/src/bus/index.ts
@@ -90,8 +90,9 @@ export namespace Bus {
if (ps) yield* PubSub.publish(ps, payload)
yield* PubSub.publish(state.wildcard, payload)
+ const dir = yield* InstanceState.directory
GlobalBus.emit("event", {
- directory: Instance.directory,
+ directory: dir,
payload,
})
})
diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts
index 9e56c980f..f86d8d32a 100644
--- a/packages/opencode/src/config/config.ts
+++ b/packages/opencode/src/config/config.ts
@@ -1486,7 +1486,8 @@ export namespace Config {
})
const update = Effect.fn("Config.update")(function* (config: Info) {
- const file = path.join(Instance.directory, "config.json")
+ const dir = yield* InstanceState.directory
+ const file = path.join(dir, "config.json")
const existing = yield* loadFile(file)
yield* fs.writeFileString(file, JSON.stringify(mergeDeep(existing, config), null, 2)).pipe(Effect.orDie)
yield* Effect.promise(() => Instance.dispose())
diff --git a/packages/opencode/src/effect/instance-ref.ts b/packages/opencode/src/effect/instance-ref.ts
new file mode 100644
index 000000000..d3939b264
--- /dev/null
+++ b/packages/opencode/src/effect/instance-ref.ts
@@ -0,0 +1,6 @@
+import { ServiceMap } from "effect"
+import type { InstanceContext } from "@/project/instance"
+
+export const InstanceRef = ServiceMap.Reference<InstanceContext | undefined>("~opencode/InstanceRef", {
+ defaultValue: () => undefined,
+})
diff --git a/packages/opencode/src/effect/instance-state.ts b/packages/opencode/src/effect/instance-state.ts
index 6873ec255..b073cf0a4 100644
--- a/packages/opencode/src/effect/instance-state.ts
+++ b/packages/opencode/src/effect/instance-state.ts
@@ -1,5 +1,7 @@
-import { Effect, ScopedCache, Scope } from "effect"
+import { Effect, Fiber, ScopedCache, Scope, ServiceMap } from "effect"
import { Instance, type InstanceContext } from "@/project/instance"
+import { Context } from "@/util/context"
+import { InstanceRef } from "./instance-ref"
import { registerDisposer } from "./instance-registry"
const TypeId = "~opencode/InstanceState"
@@ -10,13 +12,34 @@ export interface InstanceState<A, E = never, R = never> {
}
export namespace InstanceState {
+ export const bind = <F extends (...args: any[]) => any>(fn: F): F => {
+ try {
+ return Instance.bind(fn)
+ } catch (err) {
+ if (!(err instanceof Context.NotFound)) throw err
+ }
+ const fiber = Fiber.getCurrent()
+ const ctx = fiber ? ServiceMap.getReferenceUnsafe(fiber.services, InstanceRef) : undefined
+ if (!ctx) return fn
+ return ((...args: any[]) => Instance.restore(ctx, () => fn(...args))) as F
+ }
+
+ export const context = Effect.fnUntraced(function* () {
+ return (yield* InstanceRef) ?? Instance.current
+ })()
+
+ export const directory = Effect.map(context, (ctx) => ctx.directory)
+
export const make = <A, E = never, R = never>(
init: (ctx: InstanceContext) => Effect.Effect<A, E, R | Scope.Scope>,
): Effect.Effect<InstanceState<A, E, Exclude<R, Scope.Scope>>, never, R | Scope.Scope> =>
Effect.gen(function* () {
const cache = yield* ScopedCache.make<string, A, E, R>({
capacity: Number.POSITIVE_INFINITY,
- lookup: () => init(Instance.current),
+ lookup: () =>
+ Effect.fnUntraced(function* () {
+ return yield* init(yield* context)
+ })(),
})
const off = registerDisposer((directory) => Effect.runPromise(ScopedCache.invalidate(cache, directory)))
@@ -29,7 +52,9 @@ export namespace InstanceState {
})
export const get = <A, E, R>(self: InstanceState<A, E, R>) =>
- Effect.suspend(() => ScopedCache.get(self.cache, Instance.directory))
+ Effect.gen(function* () {
+ return yield* ScopedCache.get(self.cache, yield* directory)
+ })
export const use = <A, E, R, B>(self: InstanceState<A, E, R>, select: (value: A) => B) =>
Effect.map(get(self), select)
@@ -40,8 +65,18 @@ export namespace InstanceState {
) => Effect.flatMap(get(self), select)
export const has = <A, E, R>(self: InstanceState<A, E, R>) =>
- Effect.suspend(() => ScopedCache.has(self.cache, Instance.directory))
+ Effect.gen(function* () {
+ return yield* ScopedCache.has(self.cache, yield* directory)
+ })
export const invalidate = <A, E, R>(self: InstanceState<A, E, R>) =>
- Effect.suspend(() => ScopedCache.invalidate(self.cache, Instance.directory))
+ Effect.gen(function* () {
+ return yield* ScopedCache.invalidate(self.cache, yield* directory)
+ })
+
+ /**
+ * Effect finalizers run on the fiber scheduler after the original async
+ * boundary, so ALS reads like Instance.directory can be gone by then.
+ */
+ export const withALS = <T>(fn: () => T) => Effect.map(context, (ctx) => Instance.restore(ctx, fn))
}
diff --git a/packages/opencode/src/effect/run-service.ts b/packages/opencode/src/effect/run-service.ts
index 2daa29fde..619d5be6b 100644
--- a/packages/opencode/src/effect/run-service.ts
+++ b/packages/opencode/src/effect/run-service.ts
@@ -1,19 +1,33 @@
import { Effect, Layer, ManagedRuntime } from "effect"
import * as ServiceMap from "effect/ServiceMap"
+import { Instance } from "@/project/instance"
+import { Context } from "@/util/context"
+import { InstanceRef } from "./instance-ref"
export const memoMap = Layer.makeMemoMapUnsafe()
+function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
+ try {
+ const ctx = Instance.current
+ return Effect.provideService(effect, InstanceRef, ctx)
+ } catch (err) {
+ if (!(err instanceof Context.NotFound)) throw err
+ }
+ return effect
+}
+
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
return {
- runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
+ runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(attach(service.use(fn))),
runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
- getRuntime().runPromiseExit(service.use(fn), options),
+ getRuntime().runPromiseExit(attach(service.use(fn)), options),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
- getRuntime().runPromise(service.use(fn), options),
- runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
- runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
+ getRuntime().runPromise(attach(service.use(fn)), options),
+ runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(attach(service.use(fn))),
+ runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) =>
+ getRuntime().runCallback(attach(service.use(fn))),
}
}
diff --git a/packages/opencode/src/format/index.ts b/packages/opencode/src/format/index.ts
index 8def24875..795364be1 100644
--- a/packages/opencode/src/format/index.ts
+++ b/packages/opencode/src/format/index.ts
@@ -108,10 +108,11 @@ export namespace Format {
for (const item of yield* Effect.promise(() => getFormatter(ext))) {
log.info("running", { command: item.command })
const cmd = item.command.map((x) => x.replace("$FILE", filepath))
+ const dir = yield* InstanceState.directory
const code = yield* spawner
.spawn(
ChildProcess.make(cmd[0]!, cmd.slice(1), {
- cwd: Instance.directory,
+ cwd: dir,
env: item.environment,
extendEnv: true,
}),
diff --git a/packages/opencode/src/installation/index.ts b/packages/opencode/src/installation/index.ts
index 52c149c4f..232fa14f5 100644
--- a/packages/opencode/src/installation/index.ts
+++ b/packages/opencode/src/installation/index.ts
@@ -9,11 +9,7 @@ import z from "zod"
import { BusEvent } from "@/bus/bus-event"
import { Flag } from "../flag/flag"
import { Log } from "../util/log"
-
-declare global {
- const OPENCODE_VERSION: string
- const OPENCODE_CHANNEL: string
-}
+import { CHANNEL as channel, VERSION as version } from "./meta"
import semver from "semver"
@@ -60,8 +56,8 @@ export namespace Installation {
})
export type Info = z.infer<typeof Info>
- export const VERSION = typeof OPENCODE_VERSION === "string" ? OPENCODE_VERSION : "local"
- export const CHANNEL = typeof OPENCODE_CHANNEL === "string" ? OPENCODE_CHANNEL : "local"
+ export const VERSION = version
+ export const CHANNEL = channel
export const USER_AGENT = `opencode/${CHANNEL}/${VERSION}/${Flag.OPENCODE_CLIENT}`
export function isPreview() {
diff --git a/packages/opencode/src/installation/meta.ts b/packages/opencode/src/installation/meta.ts
new file mode 100644
index 000000000..6a1315db2
--- /dev/null
+++ b/packages/opencode/src/installation/meta.ts
@@ -0,0 +1,7 @@
+declare global {
+ const OPENCODE_VERSION: string
+ const OPENCODE_CHANNEL: string
+}
+
+export const VERSION = typeof OPENCODE_VERSION === "string" ? OPENCODE_VERSION : "local"
+export const CHANNEL = typeof OPENCODE_CHANNEL === "string" ? OPENCODE_CHANNEL : "local"
diff --git a/packages/opencode/src/project/instance.ts b/packages/opencode/src/project/instance.ts
index 5dddfe627..a0d6f2414 100644
--- a/packages/opencode/src/project/instance.ts
+++ b/packages/opencode/src/project/instance.ts
@@ -114,6 +114,14 @@ export const Instance = {
const ctx = context.use()
return ((...args: any[]) => context.provide(ctx, () => fn(...args))) as F
},
+ /**
+ * Run a synchronous function within the given instance context ALS.
+ * Use this to bridge from Effect (where InstanceRef carries context)
+ * back to sync code that reads Instance.directory from ALS.
+ */
+ restore<R>(ctx: InstanceContext, fn: () => R): R {
+ return context.provide(ctx, fn)
+ },
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
return State.create(() => Instance.directory, init, dispose)
},
diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts
index 229dff0c4..02a8d9484 100644
--- a/packages/opencode/src/session/compaction.ts
+++ b/packages/opencode/src/session/compaction.ts
@@ -17,6 +17,7 @@ import { NotFoundError } from "@/storage/db"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
+import { InstanceState } from "@/effect/instance-state"
import { isOverflow as overflow } from "./overflow"
export namespace SessionCompaction {
@@ -213,6 +214,7 @@ When constructing the summary, try to stick to this template:
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
+ const ctx = yield* InstanceState.context
const msg: MessageV2.Assistant = {
id: MessageID.ascending(),
role: "assistant",
@@ -223,8 +225,8 @@ When constructing the summary, try to stick to this template:
variant: userMessage.variant,
summary: true,
path: {
- cwd: Instance.directory,
- root: Instance.worktree,
+ cwd: ctx.directory,
+ root: ctx.worktree,
},
cost: 0,
tokens: {
diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts
index 94aee14c0..5ed5acafa 100644
--- a/packages/opencode/src/session/index.ts
+++ b/packages/opencode/src/session/index.ts
@@ -19,6 +19,7 @@ import { Log } from "../util/log"
import { updateSchema } from "../util/update-schema"
import { MessageV2 } from "./message-v2"
import { Instance } from "../project/instance"
+import { InstanceState } from "@/effect/instance-state"
import { SessionPrompt } from "./prompt"
import { fn } from "@/util/fn"
import { Command } from "../command"
@@ -382,11 +383,12 @@ export namespace Session {
directory: string
permission?: Permission.Ruleset
}) {
+ const ctx = yield* InstanceState.context
const result: Info = {
id: SessionID.descending(input.id),
slug: Slug.create(),
version: Installation.VERSION,
- projectID: Instance.project.id,
+ projectID: ctx.project.id,
directory: input.directory,
workspaceID: input.workspaceID,
parentID: input.parentID,
@@ -444,12 +446,12 @@ export namespace Session {
})
const children = Effect.fn("Session.children")(function* (parentID: SessionID) {
- const project = Instance.project
+ const ctx = yield* InstanceState.context
const rows = yield* db((d) =>
d
.select()
.from(SessionTable)
- .where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID)))
+ .where(and(eq(SessionTable.project_id, ctx.project.id), eq(SessionTable.parent_id, parentID)))
.all(),
)
return rows.map(fromRow)
@@ -496,9 +498,10 @@ export namespace Session {
permission?: Permission.Ruleset
workspaceID?: WorkspaceID
}) {
+ const directory = yield* InstanceState.directory
return yield* createNext({
parentID: input?.parentID,
- directory: Instance.directory,
+ directory,
title: input?.title,
permission: input?.permission,
workspaceID: input?.workspaceID,
@@ -506,10 +509,11 @@ export namespace Session {
})
const fork = Effect.fn("Session.fork")(function* (input: { sessionID: SessionID; messageID?: MessageID }) {
+ const directory = yield* InstanceState.directory
const original = yield* get(input.sessionID)
const title = getForkedTitle(original.title)
const session = yield* createNext({
- directory: Instance.directory,
+ directory,
workspaceID: original.workspaceID,
title,
})
diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index dbf815bd6..083c23cc6 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -148,6 +148,7 @@ export namespace SessionPrompt {
})
const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) {
+ const ctx = yield* InstanceState.context
const parts: PromptInput["parts"] = [{ type: "text", text: template }]
const files = ConfigMarkdown.files(template)
const seen = new Set<string>()
@@ -159,7 +160,7 @@ export namespace SessionPrompt {
seen.add(name)
const filepath = name.startsWith("~/")
? path.join(os.homedir(), name.slice(2))
- : path.resolve(Instance.worktree, name)
+ : path.resolve(ctx.worktree, name)
const info = yield* fsys.stat(filepath).pipe(Effect.option)
if (Option.isNone(info)) {
@@ -553,6 +554,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
msgs: MessageV2.WithParts[]
}) {
const { task, model, lastUser, sessionID, session, msgs } = input
+ const ctx = yield* InstanceState.context
const taskTool = yield* Effect.promise(() => TaskTool.init())
const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
@@ -563,7 +565,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
mode: task.agent,
agent: task.agent,
variant: lastUser.variant,
- path: { cwd: Instance.directory, root: Instance.worktree },
+ path: { cwd: ctx.directory, root: ctx.worktree },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: taskModel.id,
@@ -734,6 +736,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) {
+ const ctx = yield* InstanceState.context
const session = yield* sessions.get(input.sessionID)
if (session.revert) {
yield* Effect.promise(() => SessionRevert.cleanup(session))
@@ -773,7 +776,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
mode: input.agent,
agent: input.agent,
cost: 0,
- path: { cwd: Instance.directory, root: Instance.worktree },
+ path: { cwd: ctx.directory, root: ctx.worktree },
time: { created: Date.now() },
role: "assistant",
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
@@ -832,7 +835,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}
const args = (invocations[shellName] ?? invocations[""]).args
- const cwd = Instance.directory
+ const cwd = ctx.directory
const shellEnv = yield* plugin.trigger(
"shell.env",
{ cwd, sessionID: input.sessionID, callID: part.callID },
@@ -976,7 +979,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
variant,
}
- yield* Effect.addFinalizer(() => Effect.sync(() => InstructionPrompt.clear(info.id)))
+ yield* Effect.addFinalizer(() => InstanceState.withALS(() => InstructionPrompt.clear(info.id)))
type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
@@ -1330,6 +1333,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
function* (sessionID: SessionID) {
+ const ctx = yield* InstanceState.context
let structured: unknown | undefined
let step = 0
const session = yield* sessions.get(sessionID)
@@ -1421,7 +1425,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
mode: agent.name,
agent: agent.name,
variant: lastUser.variant,
- path: { cwd: Instance.directory, root: Instance.worktree },
+ path: { cwd: ctx.directory, root: ctx.worktree },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: model.id,
@@ -1538,7 +1542,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}),
Effect.fnUntraced(function* (exit) {
if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
- InstructionPrompt.clear(handle.message.id)
+ yield* InstanceState.withALS(() => InstructionPrompt.clear(handle.message.id))
}),
)
if (outcome === "break") break
diff --git a/packages/opencode/src/storage/db.ts b/packages/opencode/src/storage/db.ts
index f41a1ecd8..4cb0dbc3e 100644
--- a/packages/opencode/src/storage/db.ts
+++ b/packages/opencode/src/storage/db.ts
@@ -10,8 +10,9 @@ import { NamedError } from "@opencode-ai/util/error"
import z from "zod"
import path from "path"
import { readFileSync, readdirSync, existsSync } from "fs"
-import { Installation } from "../installation"
import { Flag } from "../flag/flag"
+import { CHANNEL } from "../installation/meta"
+import { InstanceState } from "@/effect/instance-state"
import { iife } from "@/util/iife"
import { init } from "#db"
@@ -28,10 +29,9 @@ const log = Log.create({ service: "db" })
export namespace Database {
export function getChannelPath() {
- const channel = Installation.CHANNEL
- if (["latest", "beta"].includes(channel) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
+ if (["latest", "beta"].includes(CHANNEL) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
return path.join(Global.Path.data, "opencode.db")
- const safe = channel.replace(/[^a-zA-Z0-9._-]/g, "-")
+ const safe = CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-")
return path.join(Global.Path.data, `opencode-${safe}.db`)
}
@@ -142,10 +142,11 @@ export namespace Database {
}
export function effect(fn: () => any | Promise<any>) {
+ const bound = InstanceState.bind(fn)
try {
- ctx.use().effects.push(fn)
+ ctx.use().effects.push(bound)
} catch {
- fn()
+ bound()
}
}
@@ -162,12 +163,8 @@ export namespace Database {
} catch (err) {
if (err instanceof Context.NotFound) {
const effects: (() => void | Promise<void>)[] = []
- const result = Client().transaction(
- (tx: TxOrDb) => {
- return ctx.provide({ tx, effects }, () => callback(tx))
- },
- { behavior: options?.behavior },
- )
+ const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)))
+ const result = Client().transaction(txCallback, { behavior: options?.behavior })
for (const effect of effects) effect()
return result as NotPromise<T>
}
diff --git a/packages/opencode/src/worktree/index.ts b/packages/opencode/src/worktree/index.ts
index 7087ac262..da20a5d6d 100644
--- a/packages/opencode/src/worktree/index.ts
+++ b/packages/opencode/src/worktree/index.ts
@@ -18,6 +18,7 @@ import { NodePath } from "@effect/platform-node"
import { AppFileSystem } from "@/filesystem"
import { makeRuntime } from "@/effect/run-service"
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
+import { InstanceState } from "@/effect/instance-state"
export namespace Worktree {
const log = Log.create({ service: "worktree" })
@@ -199,6 +200,7 @@ export namespace Worktree {
const MAX_NAME_ATTEMPTS = 26
const candidate = Effect.fn("Worktree.candidate")(function* (root: string, base?: string) {
+ const ctx = yield* InstanceState.context
for (const attempt of Array.from({ length: MAX_NAME_ATTEMPTS }, (_, i) => i)) {
const name = base ? (attempt === 0 ? base : `${base}-${Slug.create()}`) : Slug.create()
const branch = `opencode/${name}`
@@ -207,7 +209,7 @@ export namespace Worktree {
if (yield* fs.exists(directory).pipe(Effect.orDie)) continue
const ref = `refs/heads/${branch}`
- const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: Instance.worktree })
+ const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: ctx.worktree })
if (branchCheck.code === 0) continue
return Info.parse({ name, branch, directory })
@@ -216,11 +218,12 @@ export namespace Worktree {
})
const makeWorktreeInfo = Effect.fn("Worktree.makeWorktreeInfo")(function* (name?: string) {
- if (Instance.project.vcs !== "git") {
+ const ctx = yield* InstanceState.context
+ if (ctx.project.vcs !== "git") {
throw new NotGitError({ message: "Worktrees are only supported for git projects" })
}
- const root = pathSvc.join(Global.Path.data, "worktree", Instance.project.id)
+ const root = pathSvc.join(Global.Path.data, "worktree", ctx.project.id)
yield* fs.makeDirectory(root, { recursive: true }).pipe(Effect.orDie)
const base = name ? slugify(name) : ""
@@ -228,18 +231,20 @@ export namespace Worktree {
})
const setup = Effect.fnUntraced(function* (info: Info) {
+ const ctx = yield* InstanceState.context
const created = yield* git(["worktree", "add", "--no-checkout", "-b", info.branch, info.directory], {
- cwd: Instance.worktree,
+ cwd: ctx.worktree,
})
if (created.code !== 0) {
throw new CreateFailedError({ message: created.stderr || created.text || "Failed to create git worktree" })
}
- yield* project.addSandbox(Instance.project.id, info.directory).pipe(Effect.catch(() => Effect.void))
+ yield* project.addSandbox(ctx.project.id, info.directory).pipe(Effect.catch(() => Effect.void))
})
const boot = Effect.fnUntraced(function* (info: Info, startCommand?: string) {
- const projectID = Instance.project.id
+ const ctx = yield* InstanceState.context
+ const projectID = ctx.project.id
const extra = startCommand?.trim()
const populated = yield* git(["reset", "--hard"], { cwd: info.directory })
diff --git a/packages/opencode/test/account/repo.test.ts b/packages/opencode/test/account/repo.test.ts
index fb12ddf70..460c47443 100644
--- a/packages/opencode/test/account/repo.test.ts
+++ b/packages/opencode/test/account/repo.test.ts
@@ -16,21 +16,21 @@ const truncate = Layer.effectDiscard(
const it = testEffect(Layer.merge(AccountRepo.layer, truncate))
-it.effect("list returns empty when no accounts exist", () =>
+it.live("list returns empty when no accounts exist", () =>
Effect.gen(function* () {
const accounts = yield* AccountRepo.use((r) => r.list())
expect(accounts).toEqual([])
}),
)
-it.effect("active returns none when no accounts exist", () =>
+it.live("active returns none when no accounts exist", () =>
Effect.gen(function* () {
const active = yield* AccountRepo.use((r) => r.active())
expect(Option.isNone(active)).toBe(true)
}),
)
-it.effect("persistAccount inserts and getRow retrieves", () =>
+it.live("persistAccount inserts and getRow retrieves", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
yield* AccountRepo.use((r) =>
@@ -56,7 +56,7 @@ it.effect("persistAccount inserts and getRow retrieves", () =>
}),
)
-it.effect("persistAccount sets the active account and org", () =>
+it.live("persistAccount sets the active account and org", () =>
Effect.gen(function* () {
const id1 = AccountID.make("user-1")
const id2 = AccountID.make("user-2")
@@ -93,7 +93,7 @@ it.effect("persistAccount sets the active account and org", () =>
}),
)
-it.effect("list returns all accounts", () =>
+it.live("list returns all accounts", () =>
Effect.gen(function* () {
const id1 = AccountID.make("user-1")
const id2 = AccountID.make("user-2")
@@ -128,7 +128,7 @@ it.effect("list returns all accounts", () =>
}),
)
-it.effect("remove deletes an account", () =>
+it.live("remove deletes an account", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
@@ -151,7 +151,7 @@ it.effect("remove deletes an account", () =>
}),
)
-it.effect("use stores the selected org and marks the account active", () =>
+it.live("use stores the selected org and marks the account active", () =>
Effect.gen(function* () {
const id1 = AccountID.make("user-1")
const id2 = AccountID.make("user-2")
@@ -191,7 +191,7 @@ it.effect("use stores the selected org and marks the account active", () =>
}),
)
-it.effect("persistToken updates token fields", () =>
+it.live("persistToken updates token fields", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
@@ -225,7 +225,7 @@ it.effect("persistToken updates token fields", () =>
}),
)
-it.effect("persistToken with no expiry sets token_expiry to null", () =>
+it.live("persistToken with no expiry sets token_expiry to null", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
@@ -255,7 +255,7 @@ it.effect("persistToken with no expiry sets token_expiry to null", () =>
}),
)
-it.effect("persistAccount upserts on conflict", () =>
+it.live("persistAccount upserts on conflict", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
@@ -295,7 +295,7 @@ it.effect("persistAccount upserts on conflict", () =>
}),
)
-it.effect("remove clears active state when deleting the active account", () =>
+it.live("remove clears active state when deleting the active account", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
@@ -318,7 +318,7 @@ it.effect("remove clears active state when deleting the active account", () =>
}),
)
-it.effect("getRow returns none for nonexistent account", () =>
+it.live("getRow returns none for nonexistent account", () =>
Effect.gen(function* () {
const row = yield* AccountRepo.use((r) => r.getRow(AccountID.make("nope")))
expect(Option.isNone(row)).toBe(true)
diff --git a/packages/opencode/test/account/service.test.ts b/packages/opencode/test/account/service.test.ts
index 9c67641d2..cfe55e23e 100644
--- a/packages/opencode/test/account/service.test.ts
+++ b/packages/opencode/test/account/service.test.ts
@@ -54,7 +54,7 @@ const deviceTokenClient = (body: unknown, status = 400) =>
const poll = (body: unknown, status = 400) =>
Account.Service.use((s) => s.poll(login())).pipe(Effect.provide(live(deviceTokenClient(body, status))))
-it.effect("orgsByAccount groups orgs per account", () =>
+it.live("orgsByAccount groups orgs per account", () =>
Effect.gen(function* () {
yield* AccountRepo.use((r) =>
r.persistAccount({
@@ -107,7 +107,7 @@ it.effect("orgsByAccount groups orgs per account", () =>
}),
)
-it.effect("token refresh persists the new token", () =>
+it.live("token refresh persists the new token", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
@@ -148,7 +148,7 @@ it.effect("token refresh persists the new token", () =>
}),
)
-it.effect("config sends the selected org header", () =>
+it.live("config sends the selected org header", () =>
Effect.gen(function* () {
const id = AccountID.make("user-1")
@@ -188,7 +188,7 @@ it.effect("config sends the selected org header", () =>
}),
)
-it.effect("poll stores the account and first org on success", () =>
+it.live("poll stores the account and first org on success", () =>
Effect.gen(function* () {
const client = HttpClient.make((req) =>
Effect.succeed(
@@ -259,7 +259,7 @@ for (const [name, body, expectedTag] of [
"PollExpired",
],
] as const) {
- it.effect(`poll returns ${name} for ${body.error}`, () =>
+ it.live(`poll returns ${name} for ${body.error}`, () =>
Effect.gen(function* () {
const result = yield* poll(body)
expect(result._tag).toBe(expectedTag)
@@ -267,7 +267,7 @@ for (const [name, body, expectedTag] of [
)
}
-it.effect("poll returns poll error for other OAuth errors", () =>
+it.live("poll returns poll error for other OAuth errors", () =>
Effect.gen(function* () {
const result = yield* poll({
error: "server_error",
diff --git a/packages/opencode/test/bus/bus-effect.test.ts b/packages/opencode/test/bus/bus-effect.test.ts
index 642763e90..6f3bcbcfa 100644
--- a/packages/opencode/test/bus/bus-effect.test.ts
+++ b/packages/opencode/test/bus/bus-effect.test.ts
@@ -22,7 +22,7 @@ const live = Layer.mergeAll(Bus.layer, node)
const it = testEffect(live)
describe("Bus (Effect-native)", () => {
- it.effect("publish + subscribe stream delivers events", () =>
+ it.live("publish + subscribe stream delivers events", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
@@ -46,7 +46,7 @@ describe("Bus (Effect-native)", () => {
),
)
- it.effect("subscribe filters by event type", () =>
+ it.live("subscribe filters by event type", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
@@ -70,7 +70,7 @@ describe("Bus (Effect-native)", () => {
),
)
- it.effect("subscribeAll receives all types", () =>
+ it.live("subscribeAll receives all types", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
@@ -95,7 +95,7 @@ describe("Bus (Effect-native)", () => {
),
)
- it.effect("multiple subscribers each receive the event", () =>
+ it.live("multiple subscribers each receive the event", () =>
provideTmpdirInstance(() =>
Effect.gen(function* () {
const bus = yield* Bus.Service
@@ -129,7 +129,7 @@ describe("Bus (Effect-native)", () => {
),
)
- it.effect("subscribeAll stream sees InstanceDisposed on disposal", () =>
+ it.live("subscribeAll stream sees InstanceDisposed on disposal", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped()
const types: string[] = []
diff --git a/packages/opencode/test/effect/instance-state.test.ts b/packages/opencode/test/effect/instance-state.test.ts
index 2d527482b..914753312 100644
--- a/packages/opencode/test/effect/instance-state.test.ts
+++ b/packages/opencode/test/effect/instance-state.test.ts
@@ -1,6 +1,7 @@
import { afterEach, expect, test } from "bun:test"
-import { Duration, Effect, Layer, ManagedRuntime, ServiceMap } from "effect"
+import { Cause, Deferred, Duration, Effect, Exit, Fiber, Layer, ManagedRuntime, ServiceMap } from "effect"
import { InstanceState } from "../../src/effect/instance-state"
+import { InstanceRef } from "../../src/effect/instance-ref"
import { Instance } from "../../src/project/instance"
import { tmpdir } from "../fixture/fixture"
@@ -382,3 +383,100 @@ test("InstanceState dedupes concurrent lookups", async () => {
),
)
})
+
+test("InstanceState survives deferred resume from the same instance context", async () => {
+ await using tmp = await tmpdir({ git: true })
+
+ interface Api {
+ readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
+ }
+
+ class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResume") {
+ static readonly layer = Layer.effect(
+ Test,
+ Effect.gen(function* () {
+ const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
+
+ return Test.of({
+ get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
+ yield* Deferred.await(gate)
+ return yield* InstanceState.get(state)
+ }),
+ })
+ }),
+ )
+ }
+
+ const rt = ManagedRuntime.make(Test.layer)
+
+ try {
+ const gate = await Effect.runPromise(Deferred.make<void>())
+ const fiber = await Instance.provide({
+ directory: tmp.path,
+ fn: () => Promise.resolve(rt.runFork(Test.use((svc) => svc.get(gate)))),
+ })
+
+ await Instance.provide({
+ directory: tmp.path,
+ fn: () => Effect.runPromise(Deferred.succeed(gate, void 0)),
+ })
+ const exit = await Effect.runPromise(Fiber.await(fiber))
+
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value).toBe(tmp.path)
+ }
+ } finally {
+ await rt.dispose()
+ }
+})
+
+test("InstanceState survives deferred resume outside ALS when InstanceRef is set", async () => {
+ await using tmp = await tmpdir({ git: true })
+
+ interface Api {
+ readonly get: (gate: Deferred.Deferred<void>) => Effect.Effect<string>
+ }
+
+ class Test extends ServiceMap.Service<Test, Api>()("@test/DeferredResumeOutside") {
+ static readonly layer = Layer.effect(
+ Test,
+ Effect.gen(function* () {
+ const state = yield* InstanceState.make((ctx) => Effect.sync(() => ctx.directory))
+
+ return Test.of({
+ get: Effect.fn("Test.get")(function* (gate: Deferred.Deferred<void>) {
+ yield* Deferred.await(gate)
+ return yield* InstanceState.get(state)
+ }),
+ })
+ }),
+ )
+ }
+
+ const rt = ManagedRuntime.make(Test.layer)
+
+ try {
+ const gate = await Effect.runPromise(Deferred.make<void>())
+ // Provide InstanceRef so the fiber carries the context even when
+ // the deferred is resolved from outside Instance.provide ALS.
+ const fiber = await Instance.provide({
+ directory: tmp.path,
+ fn: () =>
+ Promise.resolve(
+ rt.runFork(Test.use((svc) => svc.get(gate)).pipe(Effect.provideService(InstanceRef, Instance.current))),
+ ),
+ })
+
+ // Resume from outside any Instance.provide — ALS is NOT set here
+ await Effect.runPromise(Deferred.succeed(gate, void 0))
+ const exit = await Effect.runPromise(Fiber.await(fiber))
+
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value).toBe(tmp.path)
+ }
+ } finally {
+ await rt.dispose()
+ }
+})
diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts
index 5d3488849..9dc395876 100644
--- a/packages/opencode/test/effect/runner.test.ts
+++ b/packages/opencode/test/effect/runner.test.ts
@@ -6,7 +6,7 @@ import { it } from "../lib/effect"
describe("Runner", () => {
// --- ensureRunning semantics ---
- it.effect(
+ it.live(
"ensureRunning starts work and returns result",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -18,7 +18,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"ensureRunning propagates work failures",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -29,7 +29,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"concurrent callers share the same run",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -51,7 +51,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"concurrent callers all receive same error",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -71,7 +71,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"ensureRunning can be called again after previous run completes",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -81,7 +81,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"second ensureRunning ignores new work if already running",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -110,7 +110,7 @@ describe("Runner", () => {
// --- cancel semantics ---
- it.effect(
+ it.live(
"cancel interrupts running work",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -128,7 +128,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"cancel on idle is a no-op",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -138,7 +138,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"cancel with onInterrupt resolves callers gracefully",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -154,7 +154,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"cancel with queued callers resolves all",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -175,7 +175,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"work can be started after cancel",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -245,7 +245,7 @@ describe("Runner", () => {
// --- shell semantics ---
- it.effect(
+ it.live(
"shell runs exclusively",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -256,7 +256,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"shell rejects when run is active",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -272,7 +272,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"shell rejects when another shell is running",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -292,7 +292,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"shell rejects via busy callback and cancel still stops the first shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -323,7 +323,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"cancel interrupts shell that ignores abort signal",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -349,7 +349,7 @@ describe("Runner", () => {
// --- shell→run handoff ---
- it.effect(
+ it.live(
"ensureRunning queues behind shell then runs after",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -376,7 +376,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"multiple ensureRunning callers share the queued run behind shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -407,7 +407,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"cancel during shell_then_run cancels both",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -441,7 +441,7 @@ describe("Runner", () => {
// --- lifecycle callbacks ---
- it.effect(
+ it.live(
"onIdle fires when returning to idle from running",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -454,7 +454,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"onIdle fires on cancel",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -470,7 +470,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"onBusy fires when shell starts",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -485,7 +485,7 @@ describe("Runner", () => {
// --- busy flag ---
- it.effect(
+ it.live(
"busy is true during run",
Effect.gen(function* () {
const s = yield* Scope.Scope
@@ -502,7 +502,7 @@ describe("Runner", () => {
}),
)
- it.effect(
+ it.live(
"busy is true during shell",
Effect.gen(function* () {
const s = yield* Scope.Scope
diff --git a/packages/opencode/test/fixture/fixture.ts b/packages/opencode/test/fixture/fixture.ts
index a36a3f9d8..a50e0c4f6 100644
--- a/packages/opencode/test/fixture/fixture.ts
+++ b/packages/opencode/test/fixture/fixture.ts
@@ -2,10 +2,14 @@ import { $ } from "bun"
import * as fs from "fs/promises"
import os from "os"
import path from "path"
-import { Effect, FileSystem, ServiceMap } from "effect"
+import { Effect, ServiceMap } from "effect"
+import type * as PlatformError from "effect/PlatformError"
+import type * as Scope from "effect/Scope"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
import type { Config } from "../../src/config/config"
+import { InstanceRef } from "../../src/effect/instance-ref"
import { Instance } from "../../src/project/instance"
+import { TestLLMServer } from "../lib/llm-server"
// Strip null bytes from paths (defensive fix for CI environment issues)
function sanitizePath(p: string): string {
@@ -78,9 +82,17 @@ export async function tmpdir<T>(options?: TmpDirOptions<T>) {
/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */
export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.Info> }) {
return Effect.gen(function* () {
- const fs = yield* FileSystem.FileSystem
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
- const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" })
+ const dirpath = sanitizePath(path.join(os.tmpdir(), "opencode-test-" + Math.random().toString(36).slice(2)))
+ yield* Effect.promise(() => fs.mkdir(dirpath, { recursive: true }))
+ const dir = sanitizePath(yield* Effect.promise(() => fs.realpath(dirpath)))
+
+ yield* Effect.addFinalizer(() =>
+ Effect.promise(async () => {
+ if (options?.git) await stop(dir).catch(() => undefined)
+ await clean(dir).catch(() => undefined)
+ }),
+ )
const git = (...args: string[]) =>
spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode))
@@ -94,9 +106,11 @@ export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.
}
if (options?.config) {
- yield* fs.writeFileString(
- path.join(dir, "opencode.json"),
- JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
+ yield* Effect.promise(() =>
+ fs.writeFile(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
+ ),
)
}
@@ -111,7 +125,7 @@ export const provideInstance =
Effect.promise<A>(async () =>
Instance.provide({
directory,
- fn: () => Effect.runPromiseWith(services)(self),
+ fn: () => Effect.runPromiseWith(services)(self.pipe(Effect.provideService(InstanceRef, Instance.current))),
}),
),
)
@@ -139,3 +153,20 @@ export function provideTmpdirInstance<A, E, R>(
return yield* self(path).pipe(provideInstance(path))
})
}
+
+export function provideTmpdirServer<A, E, R>(
+ self: (input: { dir: string; llm: TestLLMServer["Service"] }) => Effect.Effect<A, E, R>,
+ options?: { git?: boolean; config?: (url: string) => Partial<Config.Info> },
+): Effect.Effect<
+ A,
+ E | PlatformError.PlatformError,
+ R | TestLLMServer | ChildProcessSpawner.ChildProcessSpawner | Scope.Scope
+> {
+ return Effect.gen(function* () {
+ const llm = yield* TestLLMServer
+ return yield* provideTmpdirInstance((dir) => self({ dir, llm }), {
+ git: options?.git,
+ config: options?.config?.(llm.url),
+ })
+ })
+}
diff --git a/packages/opencode/test/format/format.test.ts b/packages/opencode/test/format/format.test.ts
index 74336e02a..95fe763d4 100644
--- a/packages/opencode/test/format/format.test.ts
+++ b/packages/opencode/test/format/format.test.ts
@@ -10,7 +10,7 @@ import * as Formatter from "../../src/format/formatter"
const it = testEffect(Layer.mergeAll(Format.defaultLayer, CrossSpawnSpawner.defaultLayer, NodeFileSystem.layer))
describe("Format", () => {
- it.effect("status() returns built-in formatters when no config overrides", () =>
+ it.live("status() returns built-in formatters when no config overrides", () =>
provideTmpdirInstance(() =>
Format.Service.use((fmt) =>
Effect.gen(function* () {
@@ -32,7 +32,7 @@ describe("Format", () => {
),
)
- it.effect("status() returns empty list when formatter is disabled", () =>
+ it.live("status() returns empty list when formatter is disabled", () =>
provideTmpdirInstance(
() =>
Format.Service.use((fmt) =>
@@ -44,7 +44,7 @@ describe("Format", () => {
),
)
- it.effect("status() excludes formatters marked as disabled in config", () =>
+ it.live("status() excludes formatters marked as disabled in config", () =>
provideTmpdirInstance(
() =>
Format.Service.use((fmt) =>
@@ -64,11 +64,11 @@ describe("Format", () => {
),
)
- it.effect("service initializes without error", () =>
+ it.live("service initializes without error", () =>
provideTmpdirInstance(() => Format.Service.use(() => Effect.void)),
)
- it.effect("status() initializes formatter state per directory", () =>
+ it.live("status() initializes formatter state per directory", () =>
Effect.gen(function* () {
const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), {
config: { formatter: false },
@@ -80,7 +80,7 @@ describe("Format", () => {
}),
)
- it.effect("runs enabled checks for matching formatters in parallel", () =>
+ it.live("runs enabled checks for matching formatters in parallel", () =>
provideTmpdirInstance((path) =>
Effect.gen(function* () {
const file = `${path}/test.parallel`
@@ -144,7 +144,7 @@ describe("Format", () => {
),
)
- it.effect("runs matching formatters sequentially for the same file", () =>
+ it.live("runs matching formatters sequentially for the same file", () =>
provideTmpdirInstance(
(path) =>
Effect.gen(function* () {
diff --git a/packages/opencode/test/lib/effect.ts b/packages/opencode/test/lib/effect.ts
index 4162ba092..131ec5cc6 100644
--- a/packages/opencode/test/lib/effect.ts
+++ b/packages/opencode/test/lib/effect.ts
@@ -1,14 +1,14 @@
import { test, type TestOptions } from "bun:test"
import { Cause, Effect, Exit, Layer } from "effect"
import type * as Scope from "effect/Scope"
+import * as TestClock from "effect/testing/TestClock"
import * as TestConsole from "effect/testing/TestConsole"
type Body<A, E, R> = Effect.Effect<A, E, R> | (() => Effect.Effect<A, E, R>)
-const env = TestConsole.layer
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
-const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2, never>) =>
+const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) =>
Effect.gen(function* () {
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit)
if (Exit.isFailure(exit)) {
@@ -19,19 +19,35 @@ const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer
return yield* exit
}).pipe(Effect.runPromise)
-const make = <R, E>(layer: Layer.Layer<R, E, never>) => {
+const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>) => {
const effect = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
- test(name, () => run(value, layer), opts)
+ test(name, () => run(value, testLayer), opts)
effect.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
- test.only(name, () => run(value, layer), opts)
+ test.only(name, () => run(value, testLayer), opts)
effect.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
- test.skip(name, () => run(value, layer), opts)
+ test.skip(name, () => run(value, testLayer), opts)
- return { effect }
+ const live = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
+ test(name, () => run(value, liveLayer), opts)
+
+ live.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
+ test.only(name, () => run(value, liveLayer), opts)
+
+ live.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
+ test.skip(name, () => run(value, liveLayer), opts)
+
+ return { effect, live }
}
-export const it = make(env)
+// Test environment with TestClock and TestConsole
+const testEnv = Layer.mergeAll(TestConsole.layer, TestClock.layer())
+
+// Live environment - uses real clock, but keeps TestConsole for output capture
+const liveEnv = TestConsole.layer
+
+export const it = make(testEnv, liveEnv)
-export const testEffect = <R, E>(layer: Layer.Layer<R, E, never>) => make(Layer.provideMerge(layer, env))
+export const testEffect = <R, E>(layer: Layer.Layer<R, E>) =>
+ make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv))
diff --git a/packages/opencode/test/lib/llm-server.ts b/packages/opencode/test/lib/llm-server.ts
new file mode 100644
index 000000000..b0a54424e
--- /dev/null
+++ b/packages/opencode/test/lib/llm-server.ts
@@ -0,0 +1,282 @@
+import { NodeHttpServer } from "@effect/platform-node"
+import * as Http from "node:http"
+import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect"
+import * as HttpServer from "effect/unstable/http/HttpServer"
+import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
+
+type Step =
+ | {
+ type: "text"
+ text: string
+ }
+ | {
+ type: "tool"
+ tool: string
+ input: unknown
+ }
+ | {
+ type: "fail"
+ message: string
+ }
+ | {
+ type: "hang"
+ }
+ | {
+ type: "hold"
+ text: string
+ wait: PromiseLike<unknown>
+ }
+
+type Hit = {
+ url: URL
+ body: Record<string, unknown>
+}
+
+type Wait = {
+ count: number
+ ready: Deferred.Deferred<void>
+}
+
+function sse(lines: unknown[]) {
+ return HttpServerResponse.stream(
+ Stream.fromIterable([
+ [...lines.map((line) => `data: ${JSON.stringify(line)}`), "data: [DONE]"].join("\n\n") + "\n\n",
+ ]).pipe(Stream.encodeText),
+ { contentType: "text/event-stream" },
+ )
+}
+
+function text(step: Extract<Step, { type: "text" }>) {
+ return sse([
+ {
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { role: "assistant" } }],
+ },
+ {
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { content: step.text } }],
+ },
+ {
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [{ delta: {}, finish_reason: "stop" }],
+ },
+ ])
+}
+
+function tool(step: Extract<Step, { type: "tool" }>, seq: number) {
+ const id = `call_${seq}`
+ const args = JSON.stringify(step.input)
+ return sse([
+ {
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { role: "assistant" } }],
+ },
+ {
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [
+ {
+ delta: {
+ tool_calls: [
+ {
+ index: 0,
+ id,
+ type: "function",
+ function: {
+ name: step.tool,
+ arguments: "",
+ },
+ },
+ ],
+ },
+ },
+ ],
+ },
+ {
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [
+ {
+ delta: {
+ tool_calls: [
+ {
+ index: 0,
+ function: {
+ arguments: args,
+ },
+ },
+ ],
+ },
+ },
+ ],
+ },
+ {
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [{ delta: {}, finish_reason: "tool_calls" }],
+ },
+ ])
+}
+
+function fail(step: Extract<Step, { type: "fail" }>) {
+ return HttpServerResponse.stream(
+ Stream.fromIterable([
+ 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n',
+ ]).pipe(Stream.encodeText, Stream.concat(Stream.fail(new Error(step.message)))),
+ { contentType: "text/event-stream" },
+ )
+}
+
+function hang() {
+ return HttpServerResponse.stream(
+ Stream.fromIterable([
+ 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n',
+ ]).pipe(Stream.encodeText, Stream.concat(Stream.never)),
+ { contentType: "text/event-stream" },
+ )
+}
+
+function hold(step: Extract<Step, { type: "hold" }>) {
+ return HttpServerResponse.stream(
+ Stream.fromIterable([
+ 'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n',
+ ]).pipe(
+ Stream.encodeText,
+ Stream.concat(
+ Stream.fromEffect(Effect.promise(() => step.wait)).pipe(
+ Stream.flatMap(() =>
+ Stream.fromIterable([
+ `data: ${JSON.stringify({
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { content: step.text } }],
+ })}\n\n`,
+ `data: ${JSON.stringify({
+ id: "chatcmpl-test",
+ object: "chat.completion.chunk",
+ choices: [{ delta: {}, finish_reason: "stop" }],
+ })}\n\n`,
+ "data: [DONE]\n\n",
+ ]).pipe(Stream.encodeText),
+ ),
+ ),
+ ),
+ ),
+ { contentType: "text/event-stream" },
+ )
+}
+
+namespace TestLLMServer {
+ export interface Service {
+ readonly url: string
+ readonly text: (value: string) => Effect.Effect<void>
+ readonly tool: (tool: string, input: unknown) => Effect.Effect<void>
+ readonly fail: (message?: string) => Effect.Effect<void>
+ readonly hang: Effect.Effect<void>
+ readonly hold: (text: string, wait: PromiseLike<unknown>) => Effect.Effect<void>
+ readonly hits: Effect.Effect<Hit[]>
+ readonly calls: Effect.Effect<number>
+ readonly wait: (count: number) => Effect.Effect<void>
+ readonly inputs: Effect.Effect<Record<string, unknown>[]>
+ readonly pending: Effect.Effect<number>
+ }
+}
+
+export class TestLLMServer extends ServiceMap.Service<TestLLMServer, TestLLMServer.Service>()("@test/LLMServer") {
+ static readonly layer = Layer.effect(
+ TestLLMServer,
+ Effect.gen(function* () {
+ const server = yield* HttpServer.HttpServer
+ const router = yield* HttpRouter.HttpRouter
+
+ let hits: Hit[] = []
+ let list: Step[] = []
+ let seq = 0
+ let waits: Wait[] = []
+
+ const push = (step: Step) => {
+ list = [...list, step]
+ }
+
+ const notify = Effect.fnUntraced(function* () {
+ const ready = waits.filter((item) => hits.length >= item.count)
+ if (!ready.length) return
+ waits = waits.filter((item) => hits.length < item.count)
+ yield* Effect.forEach(ready, (item) => Deferred.succeed(item.ready, void 0))
+ })
+
+ const pull = () => {
+ const step = list[0]
+ if (!step) return { step: undefined, seq }
+ seq += 1
+ list = list.slice(1)
+ return { step, seq }
+ }
+
+ yield* router.add(
+ "POST",
+ "/v1/chat/completions",
+ Effect.gen(function* () {
+ const req = yield* HttpServerRequest.HttpServerRequest
+ const next = pull()
+ if (!next.step) return HttpServerResponse.text("unexpected request", { status: 500 })
+ const json = yield* req.json.pipe(Effect.orElseSucceed(() => ({})))
+ hits = [
+ ...hits,
+ {
+ url: new URL(req.originalUrl, "http://localhost"),
+ body: json && typeof json === "object" ? (json as Record<string, unknown>) : {},
+ },
+ ]
+ yield* notify()
+ if (next.step.type === "text") return text(next.step)
+ if (next.step.type === "tool") return tool(next.step, next.seq)
+ if (next.step.type === "fail") return fail(next.step)
+ if (next.step.type === "hang") return hang()
+ return hold(next.step)
+ }),
+ )
+
+ yield* server.serve(router.asHttpEffect())
+
+ return TestLLMServer.of({
+ url:
+ server.address._tag === "TcpAddress"
+ ? `http://127.0.0.1:${server.address.port}/v1`
+ : `unix://${server.address.path}/v1`,
+ text: Effect.fn("TestLLMServer.text")(function* (value: string) {
+ push({ type: "text", text: value })
+ }),
+ tool: Effect.fn("TestLLMServer.tool")(function* (tool: string, input: unknown) {
+ push({ type: "tool", tool, input })
+ }),
+ fail: Effect.fn("TestLLMServer.fail")(function* (message = "boom") {
+ push({ type: "fail", message })
+ }),
+ hang: Effect.gen(function* () {
+ push({ type: "hang" })
+ }).pipe(Effect.withSpan("TestLLMServer.hang")),
+ hold: Effect.fn("TestLLMServer.hold")(function* (text: string, wait: PromiseLike<unknown>) {
+ push({ type: "hold", text, wait })
+ }),
+ hits: Effect.sync(() => [...hits]),
+ calls: Effect.sync(() => hits.length),
+ wait: Effect.fn("TestLLMServer.wait")(function* (count: number) {
+ if (hits.length >= count) return
+ const ready = yield* Deferred.make<void>()
+ waits = [...waits, { count, ready }]
+ yield* Deferred.await(ready)
+ }),
+ inputs: Effect.sync(() => hits.map((hit) => hit.body)),
+ pending: Effect.sync(() => list.length),
+ })
+ }),
+ ).pipe(
+ Layer.provide(HttpRouter.layer), //
+ Layer.provide(NodeHttpServer.layer(() => Http.createServer(), { port: 0 })),
+ )
+}
diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts
index 0dfdef26f..23c6911a2 100644
--- a/packages/opencode/test/session/processor-effect.test.ts
+++ b/packages/opencode/test/session/processor-effect.test.ts
@@ -264,7 +264,7 @@ const env = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
const it = testEffect(env)
-it.effect("session.processor effect tests capture llm input cleanly", () => {
+it.live("session.processor effect tests capture llm input cleanly", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -316,7 +316,7 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
)
})
-it.effect("session.processor effect tests stop after token overflow requests compaction", () => {
+it.live("session.processor effect tests stop after token overflow requests compaction", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -376,7 +376,7 @@ it.effect("session.processor effect tests stop after token overflow requests com
)
})
-it.effect("session.processor effect tests reset reasoning state across retries", () => {
+it.live("session.processor effect tests reset reasoning state across retries", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -449,7 +449,7 @@ it.effect("session.processor effect tests reset reasoning state across retries",
)
})
-it.effect("session.processor effect tests do not retry unknown json errors", () => {
+it.live("session.processor effect tests do not retry unknown json errors", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -495,7 +495,7 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
)
})
-it.effect("session.processor effect tests retry recognized structured json errors", () => {
+it.live("session.processor effect tests retry recognized structured json errors", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -544,7 +544,7 @@ it.effect("session.processor effect tests retry recognized structured json error
)
})
-it.effect("session.processor effect tests publish retry status updates", () => {
+it.live("session.processor effect tests publish retry status updates", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -611,7 +611,7 @@ it.effect("session.processor effect tests publish retry status updates", () => {
)
})
-it.effect("session.processor effect tests compact on structured context overflow", () => {
+it.live("session.processor effect tests compact on structured context overflow", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -656,7 +656,7 @@ it.effect("session.processor effect tests compact on structured context overflow
)
})
-it.effect("session.processor effect tests mark pending tools as aborted on cleanup", () => {
+it.live("session.processor effect tests mark pending tools as aborted on cleanup", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -725,7 +725,7 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
)
})
-it.effect("session.processor effect tests record aborted errors and idle state", () => {
+it.live("session.processor effect tests record aborted errors and idle state", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -807,7 +807,7 @@ it.effect("session.processor effect tests record aborted errors and idle state",
)
})
-it.effect("session.processor effect tests mark interruptions aborted without manual abort", () => {
+it.live("session.processor effect tests mark interruptions aborted without manual abort", () => {
return provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts
index ef664113f..98111bb3a 100644
--- a/packages/opencode/test/session/prompt-effect.test.ts
+++ b/packages/opencode/test/session/prompt-effect.test.ts
@@ -1,7 +1,6 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect, spyOn } from "bun:test"
-import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
-import * as Stream from "effect/Stream"
+import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import z from "zod"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
@@ -31,8 +30,9 @@ import { ToolRegistry } from "../../src/tool/registry"
import { Truncate } from "../../src/tool/truncate"
import { Log } from "../../src/util/log"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
-import { provideTmpdirInstance } from "../fixture/fixture"
+import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
+import { TestLLMServer } from "../lib/llm-server"
Log.init({ print: false })
@@ -41,105 +41,6 @@ const ref = {
modelID: ModelID.make("test-model"),
}
-type Script = Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
-
-class TestLLM extends ServiceMap.Service<
- TestLLM,
- {
- readonly push: (stream: Script) => Effect.Effect<void>
- readonly reply: (...items: LLM.Event[]) => Effect.Effect<void>
- readonly calls: Effect.Effect<number>
- readonly inputs: Effect.Effect<LLM.StreamInput[]>
- }
->()("@test/PromptLLM") {}
-
-function stream(...items: LLM.Event[]) {
- return Stream.make(...items)
-}
-
-function usage(input = 1, output = 1, total = input + output) {
- return {
- inputTokens: input,
- outputTokens: output,
- totalTokens: total,
- inputTokenDetails: {
- noCacheTokens: undefined,
- cacheReadTokens: undefined,
- cacheWriteTokens: undefined,
- },
- outputTokenDetails: {
- textTokens: undefined,
- reasoningTokens: undefined,
- },
- }
-}
-
-function start(): LLM.Event {
- return { type: "start" }
-}
-
-function textStart(id = "t"): LLM.Event {
- return { type: "text-start", id }
-}
-
-function textDelta(id: string, text: string): LLM.Event {
- return { type: "text-delta", id, text }
-}
-
-function textEnd(id = "t"): LLM.Event {
- return { type: "text-end", id }
-}
-
-function finishStep(): LLM.Event {
- return {
- type: "finish-step",
- finishReason: "stop",
- rawFinishReason: "stop",
- response: { id: "res", modelId: "test-model", timestamp: new Date() },
- providerMetadata: undefined,
- usage: usage(),
- }
-}
-
-function finish(): LLM.Event {
- return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
-}
-
-function finishToolCallsStep(): LLM.Event {
- return {
- type: "finish-step",
- finishReason: "tool-calls",
- rawFinishReason: "tool_calls",
- response: { id: "res", modelId: "test-model", timestamp: new Date() },
- providerMetadata: undefined,
- usage: usage(),
- }
-}
-
-function finishToolCalls(): LLM.Event {
- return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }
-}
-
-function replyStop(text: string, id = "t") {
- return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const
-}
-
-function replyToolCalls(text: string, id = "t") {
- return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const
-}
-
-function toolInputStart(id: string, toolName: string): LLM.Event {
- return { type: "tool-input-start", id, toolName }
-}
-
-function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
- return { type: "tool-call", toolCallId, toolName, input }
-}
-
-function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
- return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
-}
-
function defer<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
const promise = new Promise<T>((done) => {
@@ -148,10 +49,6 @@ function defer<T>() {
return { promise, resolve }
}
-function waitMs(ms: number) {
- return Effect.promise(() => new Promise<void>((done) => setTimeout(done, ms)))
-}
-
function withSh<A, E, R>(fx: () => Effect.Effect<A, E, R>) {
return Effect.acquireUseRelease(
Effect.sync(() => {
@@ -189,43 +86,6 @@ function errorTool(parts: MessageV2.Part[]) {
return part?.state.status === "error" ? (part as ErrorToolPart) : undefined
}
-const llm = Layer.unwrap(
- Effect.gen(function* () {
- const queue: Script[] = []
- const inputs: LLM.StreamInput[] = []
- let calls = 0
-
- const push = Effect.fn("TestLLM.push")((item: Script) => {
- queue.push(item)
- return Effect.void
- })
-
- const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items)))
- return Layer.mergeAll(
- Layer.succeed(
- LLM.Service,
- LLM.Service.of({
- stream: (input) => {
- calls += 1
- inputs.push(input)
- const item = queue.shift() ?? Stream.empty
- return typeof item === "function" ? item(input) : item
- },
- }),
- ),
- Layer.succeed(
- TestLLM,
- TestLLM.of({
- push,
- reply,
- calls: Effect.sync(() => calls),
- inputs: Effect.sync(() => [...inputs]),
- }),
- ),
- )
- }),
-)
-
const mcp = Layer.succeed(
MCP.Service,
MCP.Service.of({
@@ -281,35 +141,40 @@ const filetime = Layer.succeed(
const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
-const deps = Layer.mergeAll(
- Session.defaultLayer,
- Snapshot.defaultLayer,
- AgentSvc.defaultLayer,
- Command.defaultLayer,
- Permission.layer,
- Plugin.defaultLayer,
- Config.defaultLayer,
- filetime,
- lsp,
- mcp,
- AppFileSystem.defaultLayer,
- status,
- llm,
-).pipe(Layer.provideMerge(infra))
-const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps))
-const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
-const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
-const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps))
-const env = SessionPrompt.layer.pipe(
- Layer.provideMerge(compact),
- Layer.provideMerge(proc),
- Layer.provideMerge(registry),
- Layer.provideMerge(trunc),
- Layer.provideMerge(deps),
-)
+function makeHttp() {
+ const deps = Layer.mergeAll(
+ Session.defaultLayer,
+ Snapshot.defaultLayer,
+ LLM.defaultLayer,
+ AgentSvc.defaultLayer,
+ Command.defaultLayer,
+ Permission.layer,
+ Plugin.defaultLayer,
+ Config.defaultLayer,
+ filetime,
+ lsp,
+ mcp,
+ AppFileSystem.defaultLayer,
+ status,
+ ).pipe(Layer.provideMerge(infra))
+ const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps))
+ const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
+ const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
+ const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps))
+ return Layer.mergeAll(
+ TestLLMServer.layer,
+ SessionPrompt.layer.pipe(
+ Layer.provideMerge(compact),
+ Layer.provideMerge(proc),
+ Layer.provideMerge(registry),
+ Layer.provideMerge(trunc),
+ Layer.provideMerge(deps),
+ ),
+ )
+}
-const it = testEffect(env)
-const unix = process.platform !== "win32" ? it.effect : it.effect.skip
+const it = testEffect(makeHttp())
+const unix = process.platform !== "win32" ? it.live : it.live.skip
// Config that registers a custom "test" provider with a "test-model" model
// so Provider.getModel("test", "test-model") succeeds inside the loop.
@@ -342,6 +207,22 @@ const cfg = {
},
}
+function providerCfg(url: string) {
+ return {
+ ...cfg,
+ provider: {
+ ...cfg.provider,
+ test: {
+ ...cfg.provider.test,
+ options: {
+ ...cfg.provider.test.options,
+ baseURL: url,
+ },
+ },
+ },
+ }
+}
+
const user = Effect.fn("test.user")(function* (sessionID: SessionID, text: string) {
const session = yield* Session.Service
const msg = yield* session.updateMessage({
@@ -407,232 +288,300 @@ const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) =>
})
const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
- const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create(input ?? { title: "Pinned" })
- return { test, prompt, sessions, chat }
+ return { prompt, sessions, chat }
})
// Loop semantics
-it.effect("loop exits immediately when last assistant has stop finish", () =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot()
- yield* seed(chat.id, { finish: "stop" })
-
- const result = yield* prompt.loop({ sessionID: chat.id })
- expect(result.info.role).toBe("assistant")
- if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
- expect(yield* test.calls).toBe(0)
- }),
- { git: true },
+it.live("loop exits immediately when last assistant has stop finish", () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
+ yield* seed(chat.id, { finish: "stop" })
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
+ expect(yield* llm.calls).toBe(0)
+ }),
+ { git: true, config: providerCfg },
),
)
-it.effect("loop calls LLM and returns assistant message", () =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot()
- yield* test.reply(...replyStop("world"))
- yield* user(chat.id, "hello")
-
- const result = yield* prompt.loop({ sessionID: chat.id })
- expect(result.info.role).toBe("assistant")
- const parts = result.parts.filter((p) => p.type === "text")
- expect(parts.some((p) => p.type === "text" && p.text === "world")).toBe(true)
- expect(yield* test.calls).toBe(1)
- }),
- { git: true, config: cfg },
+it.live("loop calls LLM and returns assistant message", () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({
+ title: "Pinned",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ })
+ yield* prompt.prompt({
+ sessionID: chat.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "hello" }],
+ })
+ yield* llm.text("world")
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ const parts = result.parts.filter((p) => p.type === "text")
+ expect(parts.some((p) => p.type === "text" && p.text === "world")).toBe(true)
+ expect(yield* llm.hits).toHaveLength(1)
+ }),
+ { git: true, config: providerCfg },
),
)
-it.effect("loop continues when finish is tool-calls", () =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot()
- yield* test.reply(...replyToolCalls("first"))
- yield* test.reply(...replyStop("second"))
- yield* user(chat.id, "hello")
+it.live("static loop returns assistant text through local provider", () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const session = yield* Effect.promise(() =>
+ Session.create({
+ title: "Prompt provider",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ }),
+ )
- const result = yield* prompt.loop({ sessionID: chat.id })
- expect(yield* test.calls).toBe(2)
- expect(result.info.role).toBe("assistant")
- if (result.info.role === "assistant") {
- expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
- expect(result.info.finish).toBe("stop")
- }
- }),
- { git: true, config: cfg },
+ yield* Effect.promise(() =>
+ SessionPrompt.prompt({
+ sessionID: session.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "hello" }],
+ }),
+ )
+
+ yield* llm.text("world")
+
+ const result = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id }))
+ expect(result.info.role).toBe("assistant")
+ expect(result.parts.some((part) => part.type === "text" && part.text === "world")).toBe(true)
+ expect(yield* llm.hits).toHaveLength(1)
+ expect(yield* llm.pending).toBe(0)
+ }),
+ { git: true, config: providerCfg },
),
)
-it.effect("failed subtask preserves metadata on error tool state", () =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot({ title: "Pinned" })
- yield* test.reply(
- start(),
- toolInputStart("task-1", "task"),
- toolCall("task-1", "task", {
- description: "inspect bug",
- prompt: "look into the cache key path",
- subagent_type: "general",
- }),
- {
- type: "finish-step",
- finishReason: "tool-calls",
- rawFinishReason: "tool_calls",
- response: { id: "res", modelId: "test-model", timestamp: new Date() },
- providerMetadata: undefined,
- usage: usage(),
- },
- { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() },
- )
- yield* test.reply(...replyStop("done"))
- const msg = yield* user(chat.id, "hello")
- yield* addSubtask(chat.id, msg.id)
+it.live("static loop consumes queued replies across turns", () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const session = yield* Effect.promise(() =>
+ Session.create({
+ title: "Prompt provider turns",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ }),
+ )
- const result = yield* prompt.loop({ sessionID: chat.id })
- expect(result.info.role).toBe("assistant")
- expect(yield* test.calls).toBe(2)
+ yield* Effect.promise(() =>
+ SessionPrompt.prompt({
+ sessionID: session.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "hello one" }],
+ }),
+ )
- const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
- const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
- expect(taskMsg?.info.role).toBe("assistant")
- if (!taskMsg || taskMsg.info.role !== "assistant") return
+ yield* llm.text("world one")
- const tool = errorTool(taskMsg.parts)
- if (!tool) return
+ const first = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id }))
+ expect(first.info.role).toBe("assistant")
+ expect(first.parts.some((part) => part.type === "text" && part.text === "world one")).toBe(true)
- expect(tool.state.error).toContain("Tool execution failed")
- expect(tool.state.metadata).toBeDefined()
- expect(tool.state.metadata?.sessionId).toBeDefined()
- expect(tool.state.metadata?.model).toEqual({
- providerID: ProviderID.make("test"),
- modelID: ModelID.make("missing-model"),
- })
- }),
+ yield* Effect.promise(() =>
+ SessionPrompt.prompt({
+ sessionID: session.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "hello two" }],
+ }),
+ )
+
+ yield* llm.text("world two")
+
+ const second = yield* Effect.promise(() => SessionPrompt.loop({ sessionID: session.id }))
+ expect(second.info.role).toBe("assistant")
+ expect(second.parts.some((part) => part.type === "text" && part.text === "world two")).toBe(true)
+
+ expect(yield* llm.hits).toHaveLength(2)
+ expect(yield* llm.pending).toBe(0)
+ }),
+ { git: true, config: providerCfg },
+ ),
+)
+
+it.live("loop continues when finish is tool-calls", () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const session = yield* sessions.create({
+ title: "Pinned",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ })
+ yield* prompt.prompt({
+ sessionID: session.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "hello" }],
+ })
+ yield* llm.tool("first", { value: "first" })
+ yield* llm.text("second")
+
+ const result = yield* prompt.loop({ sessionID: session.id })
+ expect(yield* llm.calls).toBe(2)
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") {
+ expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
+ expect(result.info.finish).toBe("stop")
+ }
+ }),
+ { git: true, config: providerCfg },
+ ),
+)
+
+it.live("failed subtask preserves metadata on error tool state", () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
+ yield* llm.tool("task", {
+ description: "inspect bug",
+ prompt: "look into the cache key path",
+ subagent_type: "general",
+ })
+ yield* llm.text("done")
+ const msg = yield* user(chat.id, "hello")
+ yield* addSubtask(chat.id, msg.id)
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ expect(yield* llm.calls).toBe(2)
+
+ const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
+ const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
+ expect(taskMsg?.info.role).toBe("assistant")
+ if (!taskMsg || taskMsg.info.role !== "assistant") return
+
+ const tool = errorTool(taskMsg.parts)
+ if (!tool) return
+
+ expect(tool.state.error).toContain("Tool execution failed")
+ expect(tool.state.metadata).toBeDefined()
+ expect(tool.state.metadata?.sessionId).toBeDefined()
+ expect(tool.state.metadata?.model).toEqual({
+ providerID: ProviderID.make("test"),
+ modelID: ModelID.make("missing-model"),
+ })
+ }),
{
git: true,
- config: {
- ...cfg,
+ config: (url) => ({
+ ...providerCfg(url),
agent: {
general: {
model: "test/missing-model",
},
},
- },
+ }),
},
),
)
-it.effect("loop sets status to busy then idle", () =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const test = yield* TestLLM
+it.live(
+ "loop sets status to busy then idle",
+ () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
- const bus = yield* Bus.Service
+ const status = yield* SessionStatus.Service
- yield* test.reply(start(), textStart(), textDelta("t", "ok"), textEnd(), finishStep(), finish())
+ yield* llm.hang
const chat = yield* sessions.create({})
yield* user(chat.id, "hi")
- const types: string[] = []
- const idle = defer<void>()
- const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
- if (evt.properties.sessionID !== chat.id) return
- types.push(evt.properties.status.type)
- if (evt.properties.status.type === "idle") idle.resolve()
- })
-
- yield* prompt.loop({ sessionID: chat.id })
- yield* Effect.promise(() => idle.promise)
- off()
-
- expect(types).toContain("busy")
- expect(types[types.length - 1]).toBe("idle")
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* llm.wait(1)
+ expect((yield* status.get(chat.id)).type).toBe("busy")
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ expect((yield* status.get(chat.id)).type).toBe("idle")
}),
- { git: true, config: cfg },
- ),
+ { git: true, config: providerCfg },
+ ),
+ 3_000,
)
// Cancel semantics
-it.effect(
+it.live(
"cancel interrupts loop and resolves with an assistant message",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot()
- yield* seed(chat.id)
-
- // Make LLM hang so the loop blocks
- yield* test.push((input) => hang(input, start()))
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
+ yield* seed(chat.id)
- // Seed a new user message so the loop enters the LLM path
- yield* user(chat.id, "more")
+ yield* llm.hang
- const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- // Give the loop time to start
- yield* waitMs(200)
- yield* prompt.cancel(chat.id)
+ yield* user(chat.id, "more")
- const exit = yield* Fiber.await(fiber)
- expect(Exit.isSuccess(exit)).toBe(true)
- if (Exit.isSuccess(exit)) {
- expect(exit.value.info.role).toBe("assistant")
- }
- }),
- { git: true, config: cfg },
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* llm.wait(1)
+ yield* prompt.cancel(chat.id)
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ }
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
-it.effect(
+it.live(
"cancel records MessageAbortedError on interrupted process",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const ready = defer<void>()
- const { test, prompt, chat } = yield* boot()
-
- yield* test.push((input) =>
- hang(input, start()).pipe(
- Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
- ),
- )
- yield* user(chat.id, "hello")
-
- const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* Effect.promise(() => ready.promise)
- yield* prompt.cancel(chat.id)
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
+ yield* llm.hang
+ yield* user(chat.id, "hello")
- const exit = yield* Fiber.await(fiber)
- expect(Exit.isSuccess(exit)).toBe(true)
- if (Exit.isSuccess(exit)) {
- const info = exit.value.info
- if (info.role === "assistant") {
- expect(info.error?.name).toBe("MessageAbortedError")
- }
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* llm.wait(1)
+ yield* prompt.cancel(chat.id)
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ const info = exit.value.info
+ if (info.role === "assistant") {
+ expect(info.error?.name).toBe("MessageAbortedError")
}
- }),
- { git: true, config: cfg },
+ }
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
-it.effect(
+it.live(
"cancel finalizes subtask tool state",
() =>
provideTmpdirInstance(
@@ -695,45 +644,38 @@ it.effect(
30_000,
)
-it.effect(
+it.live(
"cancel with queued callers resolves all cleanly",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const ready = defer<void>()
- const { test, prompt, chat } = yield* boot()
-
- yield* test.push((input) =>
- hang(input, start()).pipe(
- Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
- ),
- )
- yield* user(chat.id, "hello")
-
- const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* Effect.promise(() => ready.promise)
- // Queue a second caller
- const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* waitMs(50)
-
- yield* prompt.cancel(chat.id)
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
+ yield* llm.hang
+ yield* user(chat.id, "hello")
- const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
- expect(Exit.isSuccess(exitA)).toBe(true)
- expect(Exit.isSuccess(exitB)).toBe(true)
- if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
- expect(exitA.value.info.id).toBe(exitB.value.info.id)
- }
- }),
- { git: true, config: cfg },
+ const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* llm.wait(1)
+ const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.sleep(50)
+
+ yield* prompt.cancel(chat.id)
+ const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(exitA)).toBe(true)
+ expect(Exit.isSuccess(exitB)).toBe(true)
+ if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
+ expect(exitA.value.info.id).toBe(exitB.value.info.id)
+ }
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
// Queue semantics
-it.effect("concurrent loop callers get same result", () =>
+it.live("concurrent loop callers get same result", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -752,152 +694,128 @@ it.effect("concurrent loop callers get same result", () =>
),
)
-it.effect("concurrent loop callers all receive same error result", () =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot()
+it.live(
+ "concurrent loop callers all receive same error result",
+ () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
- // Push a stream that fails — the loop records the error on the assistant message
- yield* test.push(Stream.fail(new Error("boom")))
+ yield* llm.fail("boom")
yield* user(chat.id, "hello")
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
concurrency: "unbounded",
})
-
- // Both callers get the same assistant with an error recorded
expect(a.info.id).toBe(b.info.id)
expect(a.info.role).toBe("assistant")
- if (a.info.role === "assistant") {
- expect(a.info.error).toBeDefined()
- }
- if (b.info.role === "assistant") {
- expect(b.info.error).toBeDefined()
- }
}),
- { git: true, config: cfg },
- ),
+ { git: true, config: providerCfg },
+ ),
+ 3_000,
)
-it.effect(
+it.live(
"prompt submitted during an active run is included in the next LLM input",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const ready = defer<void>()
- const gate = defer<void>()
- const { test, prompt, sessions, chat } = yield* boot()
-
- yield* test.push((_input) =>
- stream(start()).pipe(
- Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
- Stream.concat(
- Stream.fromEffect(Effect.promise(() => gate.promise)).pipe(
- Stream.flatMap(() =>
- stream(textStart("a"), textDelta("a", "first"), textEnd("a"), finishStep(), finish()),
- ),
- ),
- ),
- ),
- )
-
- const a = yield* prompt
- .prompt({
- sessionID: chat.id,
- agent: "build",
- model: ref,
- parts: [{ type: "text", text: "first" }],
- })
- .pipe(Effect.forkChild)
-
- yield* Effect.promise(() => ready.promise)
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const gate = defer<void>()
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
- const id = MessageID.ascending()
- const b = yield* prompt
- .prompt({
- sessionID: chat.id,
- messageID: id,
- agent: "build",
- model: ref,
- parts: [{ type: "text", text: "second" }],
- })
- .pipe(Effect.forkChild)
+ yield* llm.hold("first", gate.promise)
+ yield* llm.text("second")
- yield* Effect.promise(async () => {
- const end = Date.now() + 5000
- while (Date.now() < end) {
- const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id }))
- if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return
- await new Promise((done) => setTimeout(done, 20))
- }
- throw new Error("timed out waiting for second prompt to save")
+ const a = yield* prompt
+ .prompt({
+ sessionID: chat.id,
+ agent: "build",
+ model: ref,
+ parts: [{ type: "text", text: "first" }],
})
+ .pipe(Effect.forkChild)
+
+ yield* llm.wait(1)
+
+ const id = MessageID.ascending()
+ const b = yield* prompt
+ .prompt({
+ sessionID: chat.id,
+ messageID: id,
+ agent: "build",
+ model: ref,
+ parts: [{ type: "text", text: "second" }],
+ })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(async () => {
+ const end = Date.now() + 5000
+ while (Date.now() < end) {
+ const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id }))
+ if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return
+ await new Promise((done) => setTimeout(done, 20))
+ }
+ throw new Error("timed out waiting for second prompt to save")
+ })
- yield* test.reply(...replyStop("second"))
- gate.resolve()
-
- const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
- expect(Exit.isSuccess(ea)).toBe(true)
- expect(Exit.isSuccess(eb)).toBe(true)
- expect(yield* test.calls).toBe(2)
-
- const msgs = yield* sessions.messages({ sessionID: chat.id })
- const assistants = msgs.filter((msg) => msg.info.role === "assistant")
- expect(assistants).toHaveLength(2)
- const last = assistants.at(-1)
- if (!last || last.info.role !== "assistant") throw new Error("expected second assistant")
- expect(last.info.parentID).toBe(id)
- expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
-
- const inputs = yield* test.inputs
- expect(inputs).toHaveLength(2)
- expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second")
- }),
- { git: true, config: cfg },
+ gate.resolve()
+
+ const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(ea)).toBe(true)
+ expect(Exit.isSuccess(eb)).toBe(true)
+ expect(yield* llm.calls).toBe(2)
+
+ const msgs = yield* sessions.messages({ sessionID: chat.id })
+ const assistants = msgs.filter((msg) => msg.info.role === "assistant")
+ expect(assistants).toHaveLength(2)
+ const last = assistants.at(-1)
+ if (!last || last.info.role !== "assistant") throw new Error("expected second assistant")
+ expect(last.info.parentID).toBe(id)
+ expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
+
+ const inputs = yield* llm.inputs
+ expect(inputs).toHaveLength(2)
+ expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second")
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
-it.effect(
+it.live(
"assertNotBusy throws BusyError when loop running",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const ready = defer<void>()
- const test = yield* TestLLM
- const prompt = yield* SessionPrompt.Service
- const sessions = yield* Session.Service
-
- yield* test.push((input) =>
- hang(input, start()).pipe(
- Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
- ),
- )
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ yield* llm.hang
- const chat = yield* sessions.create({})
- yield* user(chat.id, "hi")
+ const chat = yield* sessions.create({})
+ yield* user(chat.id, "hi")
- const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* Effect.promise(() => ready.promise)
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* llm.wait(1)
- const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
- expect(Exit.isFailure(exit)).toBe(true)
- if (Exit.isFailure(exit)) {
- expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
- }
+ const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
- yield* prompt.cancel(chat.id)
- yield* Fiber.await(fiber)
- }),
- { git: true, config: cfg },
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
-it.effect("assertNotBusy succeeds when idle", () =>
+it.live("assertNotBusy succeeds when idle", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
@@ -914,37 +832,32 @@ it.effect("assertNotBusy succeeds when idle", () =>
// Shell semantics
-it.effect(
+it.live(
"shell rejects with BusyError when loop running",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const ready = defer<void>()
- const { test, prompt, chat } = yield* boot()
-
- yield* test.push((input) =>
- hang(input, start()).pipe(
- Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
- ),
- )
- yield* user(chat.id, "hi")
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
+ yield* llm.hang
+ yield* user(chat.id, "hi")
- const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* Effect.promise(() => ready.promise)
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* llm.wait(1)
- const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
- expect(Exit.isFailure(exit)).toBe(true)
- if (Exit.isFailure(exit)) {
- expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
- }
+ const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
- yield* prompt.cancel(chat.id)
- yield* Fiber.await(fiber)
- }),
- { git: true, config: cfg },
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
unix("shell captures stdout and stderr in completed tool output", () =>
@@ -1006,74 +919,82 @@ unix(
30_000,
)
-unix(
+it.live(
"loop waits while shell runs and starts after shell exits",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot()
- yield* test.reply(...replyStop("after-shell"))
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({
+ title: "Pinned",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ })
+ yield* llm.text("after-shell")
- const sh = yield* prompt
- .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
- .pipe(Effect.forkChild)
- yield* waitMs(50)
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep(50)
- const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* waitMs(50)
+ const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.sleep(50)
- expect(yield* test.calls).toBe(0)
+ expect(yield* llm.calls).toBe(0)
- yield* Fiber.await(sh)
- const exit = yield* Fiber.await(run)
+ yield* Fiber.await(sh)
+ const exit = yield* Fiber.await(loop)
- expect(Exit.isSuccess(exit)).toBe(true)
- if (Exit.isSuccess(exit)) {
- expect(exit.value.info.role).toBe("assistant")
- expect(exit.value.parts.some((part) => part.type === "text" && part.text === "after-shell")).toBe(true)
- }
- expect(yield* test.calls).toBe(1)
- }),
- { git: true, config: cfg },
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ expect(exit.value.parts.some((part) => part.type === "text" && part.text === "after-shell")).toBe(true)
+ }
+ expect(yield* llm.calls).toBe(1)
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
-unix(
+it.live(
"shell completion resumes queued loop callers",
() =>
- provideTmpdirInstance(
- (dir) =>
- Effect.gen(function* () {
- const { test, prompt, chat } = yield* boot()
- yield* test.reply(...replyStop("done"))
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({
+ title: "Pinned",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ })
+ yield* llm.text("done")
- const sh = yield* prompt
- .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
- .pipe(Effect.forkChild)
- yield* waitMs(50)
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep(50)
- const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* waitMs(50)
+ const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.sleep(50)
- expect(yield* test.calls).toBe(0)
+ expect(yield* llm.calls).toBe(0)
- yield* Fiber.await(sh)
- const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ yield* Fiber.await(sh)
+ const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
- expect(Exit.isSuccess(ea)).toBe(true)
- expect(Exit.isSuccess(eb)).toBe(true)
- if (Exit.isSuccess(ea) && Exit.isSuccess(eb)) {
- expect(ea.value.info.id).toBe(eb.value.info.id)
- expect(ea.value.info.role).toBe("assistant")
- }
- expect(yield* test.calls).toBe(1)
- }),
- { git: true, config: cfg },
+ expect(Exit.isSuccess(ea)).toBe(true)
+ expect(Exit.isSuccess(eb)).toBe(true)
+ if (Exit.isSuccess(ea) && Exit.isSuccess(eb)) {
+ expect(ea.value.info.id).toBe(eb.value.info.id)
+ expect(ea.value.info.role).toBe("assistant")
+ }
+ expect(yield* llm.calls).toBe(1)
+ }),
+ { git: true, config: providerCfg },
),
- 30_000,
+ 3_000,
)
unix(
@@ -1088,7 +1009,7 @@ unix(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
- yield* waitMs(50)
+ yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
@@ -1125,7 +1046,7 @@ unix(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "trap '' TERM; sleep 30" })
.pipe(Effect.forkChild)
- yield* waitMs(50)
+ yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
@@ -1156,14 +1077,14 @@ unix(
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
- yield* waitMs(50)
+ yield* Effect.sleep(50)
- const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
- yield* waitMs(50)
+ const loop = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.sleep(50)
yield* prompt.cancel(chat.id)
- const exit = yield* Fiber.await(run)
+ const exit = yield* Fiber.await(loop)
expect(Exit.isSuccess(exit)).toBe(true)
yield* Fiber.await(sh)
@@ -1185,7 +1106,7 @@ unix(
const a = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
- yield* waitMs(50)
+ yield* Effect.sleep(50)
const exit = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "echo hi" })
diff --git a/packages/opencode/test/tool/truncation.test.ts b/packages/opencode/test/tool/truncation.test.ts
index dba083c51..9ec5b7840 100644
--- a/packages/opencode/test/tool/truncation.test.ts
+++ b/packages/opencode/test/tool/truncation.test.ts
@@ -140,7 +140,7 @@ describe("Truncate", () => {
const DAY_MS = 24 * 60 * 60 * 1000
const it = testEffect(Layer.mergeAll(TruncateSvc.defaultLayer, NodeFileSystem.layer))
- it.effect("deletes files older than 7 days and preserves recent files", () =>
+ it.live("deletes files older than 7 days and preserves recent files", () =>
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts
index 318b8907a..290c6fd5e 100644
--- a/packages/sdk/js/src/v2/gen/types.gen.ts
+++ b/packages/sdk/js/src/v2/gen/types.gen.ts
@@ -4,20 +4,6 @@ export type ClientOptions = {
baseUrl: `${string}://${string}` | (string & {})
}
-export type EventInstallationUpdated = {
- type: "installation.updated"
- properties: {
- version: string
- }
-}
-
-export type EventInstallationUpdateAvailable = {
- type: "installation.update-available"
- properties: {
- version: string
- }
-}
-
export type Project = {
id: string
worktree: string
@@ -47,6 +33,20 @@ export type EventProjectUpdated = {
properties: Project
}
+export type EventInstallationUpdated = {
+ type: "installation.updated"
+ properties: {
+ version: string
+ }
+}
+
+export type EventInstallationUpdateAvailable = {
+ type: "installation.update-available"
+ properties: {
+ version: string
+ }
+}
+
export type EventServerInstanceDisposed = {
type: "server.instance.disposed"
properties: {
@@ -964,9 +964,9 @@ export type EventSessionDeleted = {
}
export type Event =
+ | EventProjectUpdated
| EventInstallationUpdated
| EventInstallationUpdateAvailable
- | EventProjectUpdated
| EventServerInstanceDisposed
| EventServerConnected
| EventGlobalDisposed