summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-03-30 16:06:51 -0400
committerGitHub <[email protected]>2026-03-30 20:06:51 +0000
commitc5442d418dd94fdd0d719962e4e3cd9e029e79f5 (patch)
treeb58bf22ac43c31aca2e16afffe43dc0811de60a4
parentfa95a61c4e15d6b55ac2e3a1da0176ceca76d8c2 (diff)
downloadopencode-c5442d418dd94fdd0d719962e4e3cd9e029e79f5.tar.gz
opencode-c5442d418dd94fdd0d719962e4e3cd9e029e79f5.zip
refactor(session): effectify SessionPrompt service (#19483)
-rw-r--r--packages/opencode/src/effect/runner.ts216
-rw-r--r--packages/opencode/src/server/routes/session.ts4
-rw-r--r--packages/opencode/src/session/compaction.ts64
-rw-r--r--packages/opencode/src/session/index.ts49
-rw-r--r--packages/opencode/src/session/llm.ts50
-rw-r--r--packages/opencode/src/session/processor.ts155
-rw-r--r--packages/opencode/src/session/prompt.ts3290
-rw-r--r--packages/opencode/src/session/revert.ts4
-rw-r--r--packages/opencode/src/session/summary.ts5
-rw-r--r--packages/opencode/src/tool/registry.ts13
-rw-r--r--packages/opencode/src/tool/tool.ts32
-rw-r--r--packages/opencode/test/effect/runner.test.ts523
-rw-r--r--packages/opencode/test/server/session-list.test.ts30
-rw-r--r--packages/opencode/test/server/session-messages.test.ts23
-rw-r--r--packages/opencode/test/server/session-select.test.ts18
-rw-r--r--packages/opencode/test/session/compaction.test.ts94
-rw-r--r--packages/opencode/test/session/llm.test.ts220
-rw-r--r--packages/opencode/test/session/processor-effect.test.ts152
-rw-r--r--packages/opencode/test/session/prompt-concurrency.test.ts247
-rw-r--r--packages/opencode/test/session/prompt-effect.test.ts1140
20 files changed, 4313 insertions, 2016 deletions
diff --git a/packages/opencode/src/effect/runner.ts b/packages/opencode/src/effect/runner.ts
new file mode 100644
index 000000000..fc1314071
--- /dev/null
+++ b/packages/opencode/src/effect/runner.ts
@@ -0,0 +1,216 @@
+import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect"
+
+export interface Runner<A, E = never> {
+ readonly state: Runner.State<A, E>
+ readonly busy: boolean
+ readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
+ readonly startShell: (work: (signal: AbortSignal) => Effect.Effect<A, E>) => Effect.Effect<A, E>
+ readonly cancel: Effect.Effect<void>
+}
+
+export namespace Runner {
+ export class Cancelled extends Schema.TaggedErrorClass<Cancelled>()("RunnerCancelled", {}) {}
+
+ interface RunHandle<A, E> {
+ id: number
+ done: Deferred.Deferred<A, E | Cancelled>
+ fiber: Fiber.Fiber<A, E>
+ }
+
+ interface ShellHandle<A, E> {
+ id: number
+ fiber: Fiber.Fiber<A, E>
+ abort: AbortController
+ }
+
+ interface PendingHandle<A, E> {
+ id: number
+ done: Deferred.Deferred<A, E | Cancelled>
+ work: Effect.Effect<A, E>
+ }
+
+ export type State<A, E> =
+ | { readonly _tag: "Idle" }
+ | { readonly _tag: "Running"; readonly run: RunHandle<A, E> }
+ | { readonly _tag: "Shell"; readonly shell: ShellHandle<A, E> }
+ | { readonly _tag: "ShellThenRun"; readonly shell: ShellHandle<A, E>; readonly run: PendingHandle<A, E> }
+
+ export const make = <A, E = never>(
+ scope: Scope.Scope,
+ opts?: {
+ onIdle?: Effect.Effect<void>
+ onBusy?: Effect.Effect<void>
+ onInterrupt?: Effect.Effect<A, E>
+ busy?: () => never
+ },
+ ): Runner<A, E> => {
+ const ref = SynchronizedRef.makeUnsafe<State<A, E>>({ _tag: "Idle" })
+ const idle = opts?.onIdle ?? Effect.void
+ const busy = opts?.onBusy ?? Effect.void
+ const onInterrupt = opts?.onInterrupt
+ let ids = 0
+
+ const state = () => SynchronizedRef.getUnsafe(ref)
+ const next = () => {
+ ids += 1
+ return ids
+ }
+
+ const complete = (done: Deferred.Deferred<A, E | Cancelled>, exit: Exit.Exit<A, E>) =>
+ Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)
+ ? Deferred.fail(done, new Cancelled()).pipe(Effect.asVoid)
+ : Deferred.done(done, exit).pipe(Effect.asVoid)
+
+ const idleIfCurrent = () =>
+ SynchronizedRef.modify(ref, (st) => [st._tag === "Idle" ? idle : Effect.void, st] as const).pipe(Effect.flatten)
+
+ const finishRun = (id: number, done: Deferred.Deferred<A, E | Cancelled>, exit: Exit.Exit<A, E>) =>
+ SynchronizedRef.modify(
+ ref,
+ (st) =>
+ [
+ Effect.gen(function* () {
+ if (st._tag === "Running" && st.run.id === id) yield* idle
+ yield* complete(done, exit)
+ }),
+ st._tag === "Running" && st.run.id === id ? ({ _tag: "Idle" } as const) : st,
+ ] as const,
+ ).pipe(Effect.flatten)
+
+ const startRun = (work: Effect.Effect<A, E>, done: Deferred.Deferred<A, E | Cancelled>) =>
+ Effect.gen(function* () {
+ const id = next()
+ const fiber = yield* work.pipe(
+ Effect.onExit((exit) => finishRun(id, done, exit)),
+ Effect.forkIn(scope),
+ )
+ return { id, done, fiber } satisfies RunHandle<A, E>
+ })
+
+ const finishShell = (id: number) =>
+ SynchronizedRef.modifyEffect(
+ ref,
+ Effect.fnUntraced(function* (st) {
+ if (st._tag === "Shell" && st.shell.id === id) return [idle, { _tag: "Idle" }] as const
+ if (st._tag === "ShellThenRun" && st.shell.id === id) {
+ const run = yield* startRun(st.run.work, st.run.done)
+ return [Effect.void, { _tag: "Running", run }] as const
+ }
+ return [Effect.void, st] as const
+ }),
+ ).pipe(Effect.flatten)
+
+ const stopShell = (shell: ShellHandle<A, E>) =>
+ Effect.gen(function* () {
+ shell.abort.abort()
+ const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis"))
+ if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber)
+ yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
+ })
+
+ const ensureRunning = (work: Effect.Effect<A, E>) =>
+ SynchronizedRef.modifyEffect(
+ ref,
+ Effect.fnUntraced(function* (st) {
+ switch (st._tag) {
+ case "Running":
+ case "ShellThenRun":
+ return [Deferred.await(st.run.done), st] as const
+ case "Shell": {
+ const run = {
+ id: next(),
+ done: yield* Deferred.make<A, E | Cancelled>(),
+ work,
+ } satisfies PendingHandle<A, E>
+ return [Deferred.await(run.done), { _tag: "ShellThenRun", shell: st.shell, run }] as const
+ }
+ case "Idle": {
+ const done = yield* Deferred.make<A, E | Cancelled>()
+ const run = yield* startRun(work, done)
+ return [Deferred.await(done), { _tag: "Running", run }] as const
+ }
+ }
+ }),
+ ).pipe(
+ Effect.flatten,
+ Effect.catch((e): Effect.Effect<A, E> =>
+ e instanceof Cancelled ? (onInterrupt ?? Effect.die(e)) : Effect.fail(e as E),
+ ),
+ )
+
+ const startShell = (work: (signal: AbortSignal) => Effect.Effect<A, E>) =>
+ SynchronizedRef.modifyEffect(
+ ref,
+ Effect.fnUntraced(function* (st) {
+ if (st._tag !== "Idle") {
+ return [
+ Effect.sync(() => {
+ if (opts?.busy) opts.busy()
+ throw new Error("Runner is busy")
+ }),
+ st,
+ ] as const
+ }
+ yield* busy
+ const id = next()
+ const abort = new AbortController()
+ const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
+ const shell = { id, fiber, abort } satisfies ShellHandle<A, E>
+ return [
+ Effect.gen(function* () {
+ const exit = yield* Fiber.await(fiber)
+ if (Exit.isSuccess(exit)) return exit.value
+ if (Cause.hasInterruptsOnly(exit.cause) && onInterrupt) return yield* onInterrupt
+ return yield* Effect.failCause(exit.cause)
+ }),
+ { _tag: "Shell", shell },
+ ] as const
+ }),
+ ).pipe(Effect.flatten)
+
+ const cancel = SynchronizedRef.modify(ref, (st) => {
+ switch (st._tag) {
+ case "Idle":
+ return [Effect.void, st] as const
+ case "Running":
+ return [
+ Effect.gen(function* () {
+ yield* Fiber.interrupt(st.run.fiber)
+ yield* Deferred.await(st.run.done).pipe(Effect.exit, Effect.asVoid)
+ yield* idleIfCurrent()
+ }),
+ { _tag: "Idle" } as const,
+ ] as const
+ case "Shell":
+ return [
+ Effect.gen(function* () {
+ yield* stopShell(st.shell)
+ yield* idleIfCurrent()
+ }),
+ { _tag: "Idle" } as const,
+ ] as const
+ case "ShellThenRun":
+ return [
+ Effect.gen(function* () {
+ yield* Deferred.fail(st.run.done, new Cancelled()).pipe(Effect.asVoid)
+ yield* stopShell(st.shell)
+ yield* idleIfCurrent()
+ }),
+ { _tag: "Idle" } as const,
+ ] as const
+ }
+ }).pipe(Effect.flatten)
+
+ return {
+ get state() {
+ return state()
+ },
+ get busy() {
+ return state()._tag !== "Idle"
+ },
+ ensureRunning,
+ startShell,
+ cancel,
+ }
+ }
+}
diff --git a/packages/opencode/src/server/routes/session.ts b/packages/opencode/src/server/routes/session.ts
index d499e5a1e..23615d39a 100644
--- a/packages/opencode/src/server/routes/session.ts
+++ b/packages/opencode/src/server/routes/session.ts
@@ -381,7 +381,7 @@ export const SessionRoutes = lazy(() =>
}),
),
async (c) => {
- SessionPrompt.cancel(c.req.valid("param").sessionID)
+ await SessionPrompt.cancel(c.req.valid("param").sessionID)
return c.json(true)
},
)
@@ -699,7 +699,7 @@ export const SessionRoutes = lazy(() =>
),
async (c) => {
const params = c.req.valid("param")
- SessionPrompt.assertNotBusy(params.sessionID)
+ await SessionPrompt.assertNotBusy(params.sessionID)
await Session.removeMessage({
sessionID: params.sessionID,
messageID: params.messageID,
diff --git a/packages/opencode/src/session/compaction.ts b/packages/opencode/src/session/compaction.ts
index 69759c0d9..229dff0c4 100644
--- a/packages/opencode/src/session/compaction.ts
+++ b/packages/opencode/src/session/compaction.ts
@@ -15,7 +15,7 @@ import { Plugin } from "@/plugin"
import { Config } from "@/config/config"
import { NotFoundError } from "@/storage/db"
import { ModelID, ProviderID } from "@/provider/schema"
-import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
+import { Effect, Layer, ServiceMap } from "effect"
import { makeRuntime } from "@/effect/run-service"
import { isOverflow as overflow } from "./overflow"
@@ -45,7 +45,6 @@ export namespace SessionCompaction {
parentID: MessageID
messages: MessageV2.WithParts[]
sessionID: SessionID
- abort: AbortSignal
auto: boolean
overflow?: boolean
}) => Effect.Effect<"continue" | "stop">
@@ -135,20 +134,28 @@ export namespace SessionCompaction {
parentID: MessageID
messages: MessageV2.WithParts[]
sessionID: SessionID
- abort: AbortSignal
auto: boolean
overflow?: boolean
}) {
- const userMessage = input.messages.findLast((m) => m.info.id === input.parentID)!.info as MessageV2.User
+ const parent = input.messages.findLast((m) => m.info.id === input.parentID)
+ if (!parent || parent.info.role !== "user") {
+ throw new Error(`Compaction parent must be a user message: ${input.parentID}`)
+ }
+ const userMessage = parent.info
let messages = input.messages
- let replay: MessageV2.WithParts | undefined
+ let replay:
+ | {
+ info: MessageV2.User
+ parts: MessageV2.Part[]
+ }
+ | undefined
if (input.overflow) {
const idx = input.messages.findIndex((m) => m.info.id === input.parentID)
for (let i = idx - 1; i >= 0; i--) {
const msg = input.messages[i]
if (msg.info.role === "user" && !msg.parts.some((p) => p.type === "compaction")) {
- replay = msg
+ replay = { info: msg.info, parts: msg.parts }
messages = input.messages.slice(0, i)
break
}
@@ -206,7 +213,7 @@ When constructing the summary, try to stick to this template:
const msgs = structuredClone(messages)
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
- const msg = (yield* session.updateMessage({
+ const msg: MessageV2.Assistant = {
id: MessageID.ascending(),
role: "assistant",
parentID: input.parentID,
@@ -231,25 +238,17 @@ When constructing the summary, try to stick to this template:
time: {
created: Date.now(),
},
- })) as MessageV2.Assistant
+ }
+ yield* session.updateMessage(msg)
const processor = yield* processors.create({
assistantMessage: msg,
sessionID: input.sessionID,
model,
- abort: input.abort,
- })
- const cancel = Effect.fn("SessionCompaction.cancel")(function* () {
- if (!input.abort.aborted || msg.time.completed) return
- msg.error = msg.error ?? new MessageV2.AbortedError({ message: "Aborted" }).toObject()
- msg.finish = msg.finish ?? "error"
- msg.time.completed = Date.now()
- yield* session.updateMessage(msg)
})
const result = yield* processor
.process({
user: userMessage,
agent,
- abort: input.abort,
sessionID: input.sessionID,
tools: {},
system: [],
@@ -262,7 +261,7 @@ When constructing the summary, try to stick to this template:
],
model,
})
- .pipe(Effect.ensuring(cancel()))
+ .pipe(Effect.onInterrupt(() => processor.abort()))
if (result === "compact") {
processor.message.error = new MessageV2.ContextOverflowError({
@@ -277,7 +276,7 @@ When constructing the summary, try to stick to this template:
if (result === "continue" && input.auto) {
if (replay) {
- const original = replay.info as MessageV2.User
+ const original = replay.info
const replayMsg = yield* session.updateMessage({
id: MessageID.ascending(),
role: "user",
@@ -386,7 +385,7 @@ When constructing the summary, try to stick to this template:
),
)
- const { runPromise, runPromiseExit } = makeRuntime(Service, defaultLayer)
+ const { runPromise } = makeRuntime(Service, defaultLayer)
export async function isOverflow(input: { tokens: MessageV2.Assistant["tokens"]; model: Provider.Model }) {
return runPromise((svc) => svc.isOverflow(input))
@@ -396,21 +395,16 @@ When constructing the summary, try to stick to this template:
return runPromise((svc) => svc.prune(input))
}
- export async function process(input: {
- parentID: MessageID
- messages: MessageV2.WithParts[]
- sessionID: SessionID
- abort: AbortSignal
- auto: boolean
- overflow?: boolean
- }) {
- const exit = await runPromiseExit((svc) => svc.process(input), { signal: input.abort })
- if (Exit.isFailure(exit)) {
- if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) return "stop"
- throw Cause.squash(exit.cause)
- }
- return exit.value
- }
+ export const process = fn(
+ z.object({
+ parentID: MessageID.zod,
+ messages: z.custom<MessageV2.WithParts[]>(),
+ sessionID: SessionID.zod,
+ auto: z.boolean(),
+ overflow: z.boolean().optional(),
+ }),
+ (input) => runPromise((svc) => svc.process(input)),
+ )
export const create = fn(
z.object({
diff --git a/packages/opencode/src/session/index.ts b/packages/opencode/src/session/index.ts
index 371091722..c98604052 100644
--- a/packages/opencode/src/session/index.ts
+++ b/packages/opencode/src/session/index.ts
@@ -334,14 +334,14 @@ export namespace Session {
readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect<MessageV2.WithParts[]>
readonly children: (parentID: SessionID) => Effect.Effect<Info[]>
readonly remove: (sessionID: SessionID) => Effect.Effect<void>
- readonly updateMessage: (msg: MessageV2.Info) => Effect.Effect<MessageV2.Info>
+ readonly updateMessage: <T extends MessageV2.Info>(msg: T) => Effect.Effect<T>
readonly removeMessage: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect<MessageID>
readonly removePart: (input: {
sessionID: SessionID
messageID: MessageID
partID: PartID
}) => Effect.Effect<PartID>
- readonly updatePart: (part: MessageV2.Part) => Effect.Effect<MessageV2.Part>
+ readonly updatePart: <T extends MessageV2.Part>(part: T) => Effect.Effect<T>
readonly updatePartDelta: (input: {
sessionID: SessionID
messageID: MessageID
@@ -469,26 +469,23 @@ export namespace Session {
}
})
- const updateMessage = Effect.fn("Session.updateMessage")(function* (msg: MessageV2.Info) {
- yield* Effect.sync(() =>
- SyncEvent.run(MessageV2.Event.Updated, {
- sessionID: msg.sessionID,
- info: msg,
- }),
- )
- return msg
- })
-
- const updatePart = Effect.fn("Session.updatePart")(function* (part: MessageV2.Part) {
- yield* Effect.sync(() =>
- SyncEvent.run(MessageV2.Event.PartUpdated, {
- sessionID: part.sessionID,
- part: structuredClone(part),
- time: Date.now(),
- }),
- )
- return part
- })
+ const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
+ Effect.gen(function* () {
+ yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
+ return msg
+ }).pipe(Effect.withSpan("Session.updateMessage"))
+
+ const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
+ Effect.gen(function* () {
+ yield* Effect.sync(() =>
+ SyncEvent.run(MessageV2.Event.PartUpdated, {
+ sessionID: part.sessionID,
+ part: structuredClone(part),
+ time: Date.now(),
+ }),
+ )
+ return part
+ }).pipe(Effect.withSpan("Session.updatePart"))
const create = Effect.fn("Session.create")(function* (input?: {
parentID?: SessionID
@@ -851,7 +848,9 @@ export namespace Session {
export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id)))
export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id)))
- export const updateMessage = fn(MessageV2.Info, (msg) => runPromise((svc) => svc.updateMessage(msg)))
+ export async function updateMessage<T extends MessageV2.Info>(msg: T): Promise<T> {
+ return runPromise((svc) => svc.updateMessage(MessageV2.Info.parse(msg) as T))
+ }
export const removeMessage = fn(z.object({ sessionID: SessionID.zod, messageID: MessageID.zod }), (input) =>
runPromise((svc) => svc.removeMessage(input)),
@@ -862,7 +861,9 @@ export namespace Session {
(input) => runPromise((svc) => svc.removePart(input)),
)
- export const updatePart = fn(MessageV2.Part, (part) => runPromise((svc) => svc.updatePart(part)))
+ export async function updatePart<T extends MessageV2.Part>(part: T): Promise<T> {
+ return runPromise((svc) => svc.updatePart(MessageV2.Part.parse(part) as T))
+ }
export const updatePartDelta = fn(
z.object({
diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts
index 5cb609b27..f5717da55 100644
--- a/packages/opencode/src/session/llm.ts
+++ b/packages/opencode/src/session/llm.ts
@@ -1,6 +1,7 @@
import { Provider } from "@/provider/provider"
import { Log } from "@/util/log"
-import { Effect, Layer, ServiceMap } from "effect"
+import { Cause, Effect, Layer, Record, ServiceMap } from "effect"
+import * as Queue from "effect/Queue"
import * as Stream from "effect/Stream"
import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai"
import { mergeDeep, pipe } from "remeda"
@@ -28,7 +29,6 @@ export namespace LLM {
agent: Agent.Info
permission?: Permission.Ruleset
system: string[]
- abort: AbortSignal
messages: ModelMessage[]
small?: boolean
tools: Record<string, Tool>
@@ -36,6 +36,10 @@ export namespace LLM {
toolChoice?: "auto" | "required" | "none"
}
+ export type StreamRequest = StreamInput & {
+ abort: AbortSignal
+ }
+
export type Event = Awaited<ReturnType<typeof stream>>["fullStream"] extends AsyncIterable<infer T> ? T : never
export interface Interface {
@@ -49,15 +53,32 @@ export namespace LLM {
Effect.gen(function* () {
return Service.of({
stream(input) {
- return Stream.unwrap(
- Effect.promise(() => LLM.stream(input)).pipe(
- Effect.map((result) =>
- Stream.fromAsyncIterable(result.fullStream, (err) => err).pipe(
- Stream.mapEffect((event) => Effect.succeed(event)),
- ),
- ),
+ const stream: Stream.Stream<Event, unknown> = Stream.scoped(
+ Stream.unwrap(
+ Effect.gen(function* () {
+ const ctrl = yield* Effect.acquireRelease(
+ Effect.sync(() => new AbortController()),
+ (ctrl) => Effect.sync(() => ctrl.abort()),
+ )
+ const queue = yield* Queue.unbounded<Event, unknown | Cause.Done>()
+
+ yield* Effect.promise(async () => {
+ const result = await LLM.stream({ ...input, abort: ctrl.signal })
+ for await (const event of result.fullStream) {
+ if (!Queue.offerUnsafe(queue, event)) break
+ }
+ Queue.endUnsafe(queue)
+ }).pipe(
+ Effect.catchCause((cause) => Effect.sync(() => void Queue.failCauseUnsafe(queue, cause))),
+ Effect.onInterrupt(() => Effect.sync(() => ctrl.abort())),
+ Effect.forkScoped,
+ )
+
+ return Stream.fromQueue(queue)
+ }),
),
)
+ return stream
},
})
}),
@@ -65,7 +86,7 @@ export namespace LLM {
export const defaultLayer = layer
- export async function stream(input: StreamInput) {
+ export async function stream(input: StreamRequest) {
const l = log
.clone()
.tag("providerID", input.model.providerID)
@@ -322,17 +343,12 @@ export namespace LLM {
})
}
- async function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
+ function resolveTools(input: Pick<StreamInput, "tools" | "agent" | "permission" | "user">) {
const disabled = Permission.disabled(
Object.keys(input.tools),
Permission.merge(input.agent.permission, input.permission ?? []),
)
- for (const tool of Object.keys(input.tools)) {
- if (input.user.tools?.[tool] === false || disabled.has(tool)) {
- delete input.tools[tool]
- }
- }
- return input.tools
+ return Record.filter(input.tools, (_, k) => input.user.tools?.[k] !== false && !disabled.has(k))
}
// Check if messages contain any tool-call content
diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts
index d2459cd8b..b632a61a1 100644
--- a/packages/opencode/src/session/processor.ts
+++ b/packages/opencode/src/session/processor.ts
@@ -1,8 +1,7 @@
-import { Cause, Effect, Exit, Layer, ServiceMap } from "effect"
+import { Cause, Effect, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
-import { makeRuntime } from "@/effect/run-service"
import { Config } from "@/config/config"
import { Permission } from "@/permission"
import { Plugin } from "@/plugin"
@@ -35,17 +34,10 @@ export namespace SessionProcessor {
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
}
- export interface Info {
- readonly message: MessageV2.Assistant
- readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
- readonly process: (streamInput: LLM.StreamInput) => Promise<Result>
- }
-
type Input = {
assistantMessage: MessageV2.Assistant
sessionID: SessionID
model: Provider.Model
- abort: AbortSignal
}
export interface Interface {
@@ -96,7 +88,6 @@ export namespace SessionProcessor {
assistantMessage: input.assistantMessage,
sessionID: input.sessionID,
model: input.model,
- abort: input.abort,
toolcalls: {},
shouldBreak: false,
snapshot: undefined,
@@ -105,11 +96,12 @@ export namespace SessionProcessor {
currentText: undefined,
reasoningMap: {},
}
+ let aborted = false
const parse = (e: unknown) =>
MessageV2.fromError(e, {
providerID: input.model.providerID,
- aborted: input.abort.aborted,
+ aborted,
})
const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
@@ -155,7 +147,10 @@ export namespace SessionProcessor {
return
case "tool-input-start":
- ctx.toolcalls[value.id] = (yield* session.updatePart({
+ if (ctx.assistantMessage.summary) {
+ throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
+ }
+ ctx.toolcalls[value.id] = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
@@ -163,7 +158,7 @@ export namespace SessionProcessor {
tool: value.toolName,
callID: value.id,
state: { status: "pending", input: {}, raw: "" },
- })) as MessageV2.ToolPart
+ } satisfies MessageV2.ToolPart)
return
case "tool-input-delta":
@@ -173,14 +168,17 @@ export namespace SessionProcessor {
return
case "tool-call": {
+ if (ctx.assistantMessage.summary) {
+ throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
+ }
const match = ctx.toolcalls[value.toolCallId]
if (!match) return
- ctx.toolcalls[value.toolCallId] = (yield* session.updatePart({
+ ctx.toolcalls[value.toolCallId] = yield* session.updatePart({
...match,
tool: value.toolName,
state: { status: "running", input: value.input, time: { start: Date.now() } },
metadata: value.providerMetadata,
- })) as MessageV2.ToolPart
+ } satisfies MessageV2.ToolPart)
const parts = yield* Effect.promise(() => MessageV2.parts(ctx.assistantMessage.id))
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
@@ -414,7 +412,7 @@ export namespace SessionProcessor {
})
const halt = Effect.fn("SessionProcessor.halt")(function* (e: unknown) {
- log.error("process", { error: e, stack: JSON.stringify((e as any)?.stack) })
+ log.error("process", { error: e, stack: e instanceof Error ? e.stack : undefined })
const error = parse(e)
if (MessageV2.ContextOverflowError.isInstance(error)) {
ctx.needsCompaction = true
@@ -429,59 +427,6 @@ export namespace SessionProcessor {
yield* status.set(ctx.sessionID, { type: "idle" })
})
- const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
- log.info("process")
- ctx.needsCompaction = false
- ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
-
- yield* Effect.gen(function* () {
- ctx.currentText = undefined
- ctx.reasoningMap = {}
- const stream = llm.stream(streamInput)
-
- yield* stream.pipe(
- Stream.tap((event) =>
- Effect.gen(function* () {
- input.abort.throwIfAborted()
- yield* handleEvent(event)
- }),
- ),
- Stream.takeUntil(() => ctx.needsCompaction),
- Stream.runDrain,
- )
- }).pipe(
- Effect.catchCauseIf(
- (cause) => !Cause.hasInterruptsOnly(cause),
- (cause) => Effect.fail(Cause.squash(cause)),
- ),
- Effect.retry(
- SessionRetry.policy({
- parse,
- set: (info) =>
- status.set(ctx.sessionID, {
- type: "retry",
- attempt: info.attempt,
- message: info.message,
- next: info.next,
- }),
- }),
- ),
- Effect.catchCause((cause) =>
- Cause.hasInterruptsOnly(cause)
- ? halt(new DOMException("Aborted", "AbortError"))
- : halt(Cause.squash(cause)),
- ),
- Effect.ensuring(cleanup()),
- )
-
- if (input.abort.aborted && !ctx.assistantMessage.error) {
- yield* abort()
- }
- if (ctx.needsCompaction) return "compact"
- if (ctx.blocked || ctx.assistantMessage.error || input.abort.aborted) return "stop"
- return "continue"
- })
-
const abort = Effect.fn("SessionProcessor.abort")(() =>
Effect.gen(function* () {
if (!ctx.assistantMessage.error) {
@@ -495,6 +440,53 @@ export namespace SessionProcessor {
}),
)
+ const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
+ log.info("process")
+ ctx.needsCompaction = false
+ ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
+
+ return yield* Effect.gen(function* () {
+ yield* Effect.gen(function* () {
+ ctx.currentText = undefined
+ ctx.reasoningMap = {}
+ const stream = llm.stream(streamInput)
+
+ yield* stream.pipe(
+ Stream.tap((event) => handleEvent(event)),
+ Stream.takeUntil(() => ctx.needsCompaction),
+ Stream.runDrain,
+ )
+ }).pipe(
+ Effect.onInterrupt(() => Effect.sync(() => void (aborted = true))),
+ Effect.catchCauseIf(
+ (cause) => !Cause.hasInterruptsOnly(cause),
+ (cause) => Effect.fail(Cause.squash(cause)),
+ ),
+ Effect.retry(
+ SessionRetry.policy({
+ parse,
+ set: (info) =>
+ status.set(ctx.sessionID, {
+ type: "retry",
+ attempt: info.attempt,
+ message: info.message,
+ next: info.next,
+ }),
+ }),
+ ),
+ Effect.catch(halt),
+ Effect.ensuring(cleanup()),
+ )
+
+ if (aborted && !ctx.assistantMessage.error) {
+ yield* abort()
+ }
+ if (ctx.needsCompaction) return "compact"
+ if (ctx.blocked || ctx.assistantMessage.error || aborted) return "stop"
+ return "continue"
+ }).pipe(Effect.onInterrupt(() => abort().pipe(Effect.asVoid)))
+ })
+
return {
get message() {
return ctx.assistantMessage
@@ -526,29 +518,4 @@ export namespace SessionProcessor {
),
),
)
-
- const { runPromise } = makeRuntime(Service, defaultLayer)
-
- export async function create(input: Input): Promise<Info> {
- const hit = await runPromise((svc) => svc.create(input))
- return {
- get message() {
- return hit.message
- },
- partFromToolCall(toolCallID: string) {
- return hit.partFromToolCall(toolCallID)
- },
- async process(streamInput: LLM.StreamInput) {
- const exit = await Effect.runPromiseExit(hit.process(streamInput), { signal: input.abort })
- if (Exit.isFailure(exit)) {
- if (Cause.hasInterrupts(exit.cause) && input.abort.aborted) {
- await Effect.runPromise(hit.abort())
- return "stop"
- }
- throw Cause.squash(exit.cause)
- }
- return exit.value
- },
- }
- }
}
diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index a9edf838c..c627f0a10 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -1,8 +1,6 @@
import path from "path"
import os from "os"
-import fs from "fs/promises"
import z from "zod"
-import { Filesystem } from "../util/filesystem"
import { SessionID, MessageID, PartID } from "./schema"
import { MessageV2 } from "./message-v2"
import { Log } from "../util/log"
@@ -22,13 +20,12 @@ import { Plugin } from "../plugin"
import PROMPT_PLAN from "../session/prompt/plan.txt"
import BUILD_SWITCH from "../session/prompt/build-switch.txt"
import MAX_STEPS from "../session/prompt/max-steps.txt"
-import { defer } from "../util/defer"
import { ToolRegistry } from "../tool/registry"
+import { Runner } from "@/effect/runner"
import { MCP } from "../mcp"
import { LSP } from "../lsp"
import { ReadTool } from "../tool/read"
import { FileTime } from "../file/time"
-import { NotFoundError } from "@/storage/db"
import { Flag } from "../flag/flag"
import { ulid } from "ulid"
import { spawn } from "child_process"
@@ -37,18 +34,20 @@ import { pathToFileURL, fileURLToPath } from "url"
import { ConfigMarkdown } from "../config/markdown"
import { SessionSummary } from "./summary"
import { NamedError } from "@opencode-ai/util/error"
-import { fn } from "@/util/fn"
import { SessionProcessor } from "./processor"
import { TaskTool } from "@/tool/task"
import { Tool } from "@/tool/tool"
import { Permission } from "@/permission"
import { SessionStatus } from "./status"
import { LLM } from "./llm"
-import { iife } from "@/util/iife"
import { Shell } from "@/shell/shell"
+import { AppFileSystem } from "@/filesystem"
import { Truncate } from "@/tool/truncate"
import { decodeDataUrl } from "@/util/data-url"
import { Process } from "@/util/process"
+import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
+import { InstanceState } from "@/effect/instance-state"
+import { makeRuntime } from "@/effect/run-service"
// @ts-ignore
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -66,303 +65,497 @@ const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested struc
export namespace SessionPrompt {
const log = Log.create({ service: "session.prompt" })
- const state = Instance.state(
- () => {
- const data: Record<
- string,
- {
- abort: AbortController
- callbacks: {
- resolve(input: MessageV2.WithParts): void
- reject(reason?: any): void
- }[]
- }
- > = {}
- return data
- },
- async (current) => {
- for (const item of Object.values(current)) {
- item.abort.abort()
- }
- },
- )
-
- export function assertNotBusy(sessionID: SessionID) {
- const match = state()[sessionID]
- if (match) throw new Session.BusyError(sessionID)
+ export interface Interface {
+ readonly assertNotBusy: (sessionID: SessionID) => Effect.Effect<void, Session.BusyError>
+ readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
+ readonly prompt: (input: PromptInput) => Effect.Effect<MessageV2.WithParts>
+ readonly loop: (input: z.infer<typeof LoopInput>) => Effect.Effect<MessageV2.WithParts>
+ readonly shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts>
+ readonly command: (input: CommandInput) => Effect.Effect<MessageV2.WithParts>
+ readonly resolvePromptParts: (template: string) => Effect.Effect<PromptInput["parts"]>
}
- export const PromptInput = z.object({
- sessionID: SessionID.zod,
- messageID: MessageID.zod.optional(),
- model: z
- .object({
- providerID: ProviderID.zod,
- modelID: ModelID.zod,
- })
- .optional(),
- agent: z.string().optional(),
- noReply: z.boolean().optional(),
- tools: z
- .record(z.string(), z.boolean())
- .optional()
- .describe(
- "@deprecated tools and permissions have been merged, you can set permissions on the session itself now",
- ),
- format: MessageV2.Format.optional(),
- system: z.string().optional(),
- variant: z.string().optional(),
- parts: z.array(
- z.discriminatedUnion("type", [
- MessageV2.TextPart.omit({
- messageID: true,
- sessionID: true,
- })
- .partial({
- id: true,
- })
- .meta({
- ref: "TextPartInput",
- }),
- MessageV2.FilePart.omit({
- messageID: true,
- sessionID: true,
- })
- .partial({
- id: true,
- })
- .meta({
- ref: "FilePartInput",
- }),
- MessageV2.AgentPart.omit({
- messageID: true,
- sessionID: true,
- })
- .partial({
- id: true,
- })
- .meta({
- ref: "AgentPartInput",
+ export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SessionPrompt") {}
+
+ export const layer = Layer.effect(
+ Service,
+ Effect.gen(function* () {
+ const bus = yield* Bus.Service
+ const status = yield* SessionStatus.Service
+ const sessions = yield* Session.Service
+ const agents = yield* Agent.Service
+ const processor = yield* SessionProcessor.Service
+ const compaction = yield* SessionCompaction.Service
+ const plugin = yield* Plugin.Service
+ const commands = yield* Command.Service
+ const permission = yield* Permission.Service
+ const fsys = yield* AppFileSystem.Service
+ const mcp = yield* MCP.Service
+ const lsp = yield* LSP.Service
+ const filetime = yield* FileTime.Service
+ const registry = yield* ToolRegistry.Service
+ const truncate = yield* Truncate.Service
+ const scope = yield* Scope.Scope
+
+ const cache = yield* InstanceState.make(
+ Effect.fn("SessionPrompt.state")(function* () {
+ const runners = new Map<string, Runner<MessageV2.WithParts>>()
+ yield* Effect.addFinalizer(
+ Effect.fnUntraced(function* () {
+ yield* Effect.forEach(runners.values(), (r) => r.cancel, { concurrency: "unbounded", discard: true })
+ runners.clear()
+ }),
+ )
+ return { runners }
+ }),
+ )
+
+ const getRunner = (runners: Map<string, Runner<MessageV2.WithParts>>, sessionID: SessionID) => {
+ const existing = runners.get(sessionID)
+ if (existing) return existing
+ const runner = Runner.make<MessageV2.WithParts>(scope, {
+ onIdle: Effect.gen(function* () {
+ runners.delete(sessionID)
+ yield* status.set(sessionID, { type: "idle" })
}),
- MessageV2.SubtaskPart.omit({
- messageID: true,
- sessionID: true,
+ onBusy: status.set(sessionID, { type: "busy" }),
+ onInterrupt: lastAssistant(sessionID),
+ busy: () => {
+ throw new Session.BusyError(sessionID)
+ },
})
- .partial({
- id: true,
- })
- .meta({
- ref: "SubtaskPartInput",
- }),
- ]),
- ),
- })
- export type PromptInput = z.infer<typeof PromptInput>
+ runners.set(sessionID, runner)
+ return runner
+ }
- export const prompt = fn(PromptInput, async (input) => {
- const session = await Session.get(input.sessionID)
- await SessionRevert.cleanup(session)
-
- const message = await createUserMessage(input)
- await Session.touch(input.sessionID)
-
- // this is backwards compatibility for allowing `tools` to be specified when
- // prompting
- const permissions: Permission.Ruleset = []
- for (const [tool, enabled] of Object.entries(input.tools ?? {})) {
- permissions.push({
- permission: tool,
- action: enabled ? "allow" : "deny",
- pattern: "*",
+ const assertNotBusy: (sessionID: SessionID) => Effect.Effect<void, Session.BusyError> = Effect.fn(
+ "SessionPrompt.assertNotBusy",
+ )(function* (sessionID: SessionID) {
+ const s = yield* InstanceState.get(cache)
+ const runner = s.runners.get(sessionID)
+ if (runner?.busy) throw new Session.BusyError(sessionID)
})
- }
- if (permissions.length > 0) {
- session.permission = permissions
- await Session.setPermission({ sessionID: session.id, permission: permissions })
- }
- if (input.noReply === true) {
- return message
- }
-
- return loop({ sessionID: input.sessionID })
- })
+ const cancel = Effect.fn("SessionPrompt.cancel")(function* (sessionID: SessionID) {
+ log.info("cancel", { sessionID })
+ const s = yield* InstanceState.get(cache)
+ const runner = s.runners.get(sessionID)
+ if (!runner || !runner.busy) {
+ yield* status.set(sessionID, { type: "idle" })
+ return
+ }
+ yield* runner.cancel
+ })
- export async function resolvePromptParts(template: string): Promise<PromptInput["parts"]> {
- const parts: PromptInput["parts"] = [
- {
- type: "text",
- text: template,
- },
- ]
- const files = ConfigMarkdown.files(template)
- const seen = new Set<string>()
- await Promise.all(
- files.map(async (match) => {
- const name = match[1]
- if (seen.has(name)) return
- seen.add(name)
- const filepath = name.startsWith("~/")
- ? path.join(os.homedir(), name.slice(2))
- : path.resolve(Instance.worktree, name)
-
- const stats = await fs.stat(filepath).catch(() => undefined)
- if (!stats) {
- const agent = await Agent.get(name)
- if (agent) {
+ const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) {
+ const parts: PromptInput["parts"] = [{ type: "text", text: template }]
+ const files = ConfigMarkdown.files(template)
+ const seen = new Set<string>()
+ yield* Effect.forEach(
+ files,
+ Effect.fnUntraced(function* (match) {
+ const name = match[1]
+ if (seen.has(name)) return
+ seen.add(name)
+ const filepath = name.startsWith("~/")
+ ? path.join(os.homedir(), name.slice(2))
+ : path.resolve(Instance.worktree, name)
+
+ const info = yield* fsys.stat(filepath).pipe(Effect.option)
+ if (Option.isNone(info)) {
+ const found = yield* agents.get(name)
+ if (found) parts.push({ type: "agent", name: found.name })
+ return
+ }
+ const stat = info.value
parts.push({
- type: "agent",
- name: agent.name,
+ type: "file",
+ url: pathToFileURL(filepath).href,
+ filename: name,
+ mime: stat.type === "Directory" ? "application/x-directory" : "text/plain",
+ })
+ }),
+ { concurrency: "unbounded", discard: true },
+ )
+ return parts
+ })
+
+ const title = Effect.fn("SessionPrompt.ensureTitle")(function* (input: {
+ session: Session.Info
+ history: MessageV2.WithParts[]
+ providerID: ProviderID
+ modelID: ModelID
+ }) {
+ if (input.session.parentID) return
+ if (!Session.isDefaultTitle(input.session.title)) return
+
+ const real = (m: MessageV2.WithParts) =>
+ m.info.role === "user" && !m.parts.every((p) => "synthetic" in p && p.synthetic)
+ const idx = input.history.findIndex(real)
+ if (idx === -1) return
+ if (input.history.filter(real).length !== 1) return
+
+ const context = input.history.slice(0, idx + 1)
+ const firstUser = context[idx]
+ if (!firstUser || firstUser.info.role !== "user") return
+ const firstInfo = firstUser.info
+
+ const subtasks = firstUser.parts.filter((p): p is MessageV2.SubtaskPart => p.type === "subtask")
+ const onlySubtasks = subtasks.length > 0 && firstUser.parts.every((p) => p.type === "subtask")
+
+ const ag = yield* agents.get("title")
+ if (!ag) return
+ const text = yield* Effect.promise(async (signal) => {
+ const mdl = ag.model
+ ? await Provider.getModel(ag.model.providerID, ag.model.modelID)
+ : ((await Provider.getSmallModel(input.providerID)) ??
+ (await Provider.getModel(input.providerID, input.modelID)))
+ const msgs = onlySubtasks
+ ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }]
+ : await MessageV2.toModelMessages(context, mdl)
+ const result = await LLM.stream({
+ agent: ag,
+ user: firstInfo,
+ system: [],
+ small: true,
+ tools: {},
+ model: mdl,
+ abort: signal,
+ sessionID: input.session.id,
+ retries: 2,
+ messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs],
+ })
+ return result.text
+ })
+ const cleaned = text
+ .replace(/<think>[\s\S]*?<\/think>\s*/g, "")
+ .split("\n")
+ .map((line) => line.trim())
+ .find((line) => line.length > 0)
+ if (!cleaned) return
+ const t = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
+ yield* sessions
+ .setTitle({ sessionID: input.session.id, title: t })
+ .pipe(
+ Effect.catchCause((cause) =>
+ Effect.sync(() => log.error("failed to generate title", { error: Cause.squash(cause) })),
+ ),
+ )
+ })
+
+ const insertReminders = Effect.fn("SessionPrompt.insertReminders")(function* (input: {
+ messages: MessageV2.WithParts[]
+ agent: Agent.Info
+ session: Session.Info
+ }) {
+ const userMessage = input.messages.findLast((msg) => msg.info.role === "user")
+ if (!userMessage) return input.messages
+
+ if (!Flag.OPENCODE_EXPERIMENTAL_PLAN_MODE) {
+ if (input.agent.name === "plan") {
+ userMessage.parts.push({
+ id: PartID.ascending(),
+ messageID: userMessage.info.id,
+ sessionID: userMessage.info.sessionID,
+ type: "text",
+ text: PROMPT_PLAN,
+ synthetic: true,
})
}
- return
+ const wasPlan = input.messages.some((msg) => msg.info.role === "assistant" && msg.info.agent === "plan")
+ if (wasPlan && input.agent.name === "build") {
+ userMessage.parts.push({
+ id: PartID.ascending(),
+ messageID: userMessage.info.id,
+ sessionID: userMessage.info.sessionID,
+ type: "text",
+ text: BUILD_SWITCH,
+ synthetic: true,
+ })
+ }
+ return input.messages
}
- if (stats.isDirectory()) {
- parts.push({
- type: "file",
- url: pathToFileURL(filepath).href,
- filename: name,
- mime: "application/x-directory",
+ const assistantMessage = input.messages.findLast((msg) => msg.info.role === "assistant")
+ if (input.agent.name !== "plan" && assistantMessage?.info.agent === "plan") {
+ const plan = Session.plan(input.session)
+ if (!(yield* fsys.existsSafe(plan))) return input.messages
+ const part = yield* sessions.updatePart({
+ id: PartID.ascending(),
+ messageID: userMessage.info.id,
+ sessionID: userMessage.info.sessionID,
+ type: "text",
+ text:
+ BUILD_SWITCH + "\n\n" + `A plan file exists at ${plan}. You should execute on the plan defined within it`,
+ synthetic: true,
})
- return
+ userMessage.parts.push(part)
+ return input.messages
}
- parts.push({
- type: "file",
- url: pathToFileURL(filepath).href,
- filename: name,
- mime: "text/plain",
- })
- }),
- )
- return parts
- }
+ if (input.agent.name !== "plan" || assistantMessage?.info.agent === "plan") return input.messages
- function start(sessionID: SessionID) {
- const s = state()
- if (s[sessionID]) return
- const controller = new AbortController()
- s[sessionID] = {
- abort: controller,
- callbacks: [],
- }
- return controller.signal
- }
+ const plan = Session.plan(input.session)
+ const exists = yield* fsys.existsSafe(plan)
+ if (!exists) yield* fsys.ensureDir(path.dirname(plan)).pipe(Effect.catch(Effect.die))
+ const part = yield* sessions.updatePart({
+ id: PartID.ascending(),
+ messageID: userMessage.info.id,
+ sessionID: userMessage.info.sessionID,
+ type: "text",
+ text: `<system-reminder>
+Plan mode is active. The user indicated that they do not want you to execute yet -- you MUST NOT make any edits (with the exception of the plan file mentioned below), run any non-readonly tools (including changing configs or making commits), or otherwise make any changes to the system. This supersedes any other instructions you have received.
+
+## Plan File Info:
+${exists ? `A plan file already exists at ${plan}. You can read it and make incremental edits using the edit tool.` : `No plan file exists yet. You should create your plan at ${plan} using the write tool.`}
+You should build your plan incrementally by writing to or editing this file. NOTE that this is the only file you are allowed to edit - other than this you are only allowed to take READ-ONLY actions.
- function resume(sessionID: SessionID) {
- const s = state()
- if (!s[sessionID]) return
+## Plan Workflow
- return s[sessionID].abort.signal
- }
+### Phase 1: Initial Understanding
+Goal: Gain a comprehensive understanding of the user's request by reading through code and asking them questions. Critical: In this phase you should only use the explore subagent type.
- export async function cancel(sessionID: SessionID) {
- log.info("cancel", { sessionID })
- const s = state()
- const match = s[sessionID]
- if (!match) {
- await SessionStatus.set(sessionID, { type: "idle" })
- return
- }
- match.abort.abort()
- delete s[sessionID]
- await SessionStatus.set(sessionID, { type: "idle" })
- return
- }
+1. Focus on understanding the user's request and the code associated with their request
- export const LoopInput = z.object({
- sessionID: SessionID.zod,
- resume_existing: z.boolean().optional(),
- })
- export const loop = fn(LoopInput, async (input) => {
- const { sessionID, resume_existing } = input
-
- const abort = resume_existing ? resume(sessionID) : start(sessionID)
- if (!abort) {
- return new Promise<MessageV2.WithParts>((resolve, reject) => {
- const callbacks = state()[sessionID].callbacks
- callbacks.push({ resolve, reject })
- })
- }
-
- await using _ = defer(() => cancel(sessionID))
-
- // Structured output state
- // Note: On session resumption, state is reset but outputFormat is preserved
- // on the user message and will be retrieved from lastUser below
- let structuredOutput: unknown | undefined
-
- let step = 0
- const session = await Session.get(sessionID)
- while (true) {
- await SessionStatus.set(sessionID, { type: "busy" })
- log.info("loop", { step, sessionID })
- if (abort.aborted) break
- let msgs = await MessageV2.filterCompacted(MessageV2.stream(sessionID))
-
- let lastUser: MessageV2.User | undefined
- let lastAssistant: MessageV2.Assistant | undefined
- let lastFinished: MessageV2.Assistant | undefined
- let tasks: (MessageV2.CompactionPart | MessageV2.SubtaskPart)[] = []
- for (let i = msgs.length - 1; i >= 0; i--) {
- const msg = msgs[i]
- if (!lastUser && msg.info.role === "user") lastUser = msg.info as MessageV2.User
- if (!lastAssistant && msg.info.role === "assistant") lastAssistant = msg.info as MessageV2.Assistant
- if (!lastFinished && msg.info.role === "assistant" && msg.info.finish)
- lastFinished = msg.info as MessageV2.Assistant
- if (lastUser && lastFinished) break
- const task = msg.parts.filter((part) => part.type === "compaction" || part.type === "subtask")
- if (task && !lastFinished) {
- tasks.push(...task)
- }
- }
+2. **Launch up to 3 explore agents IN PARALLEL** (single message, multiple tool calls) to efficiently explore the codebase.
+ - Use 1 agent when the task is isolated to known files, the user provided specific file paths, or you're making a small targeted change.
+ - Use multiple agents when: the scope is uncertain, multiple areas of the codebase are involved, or you need to understand existing patterns before planning.
+ - Quality over quantity - 3 agents maximum, but you should try to use the minimum number of agents necessary (usually just 1)
+ - If using multiple agents: Provide each agent with a specific search focus or area to explore. Example: One agent searches for existing implementations, another explores related components, a third investigates testing patterns
- if (!lastUser) throw new Error("No user message found in stream. This should never happen.")
- if (
- lastAssistant?.finish &&
- ![
- "tool-calls",
- // in v6 unknown became other but other existed in v5 too and was distinctly different
- // I think there are certain providers that used to have bad stop reasons, not rlly sure which
- // ones if any still have this?
- // "unknown",
- ].includes(lastAssistant.finish) &&
- lastUser.id < lastAssistant.id
- ) {
- log.info("exiting loop", { sessionID })
- break
- }
+3. After exploring the code, use the question tool to clarify ambiguities in the user request up front.
+
+### Phase 2: Design
+Goal: Design an implementation approach.
+
+Launch general agent(s) to design the implementation based on the user's intent and your exploration results from Phase 1.
+
+You can launch up to 1 agent(s) in parallel.
+
+**Guidelines:**
+- **Default**: Launch at least 1 Plan agent for most tasks - it helps validate your understanding and consider alternatives
+- **Skip agents**: Only for truly trivial tasks (typo fixes, single-line changes, simple renames)
+
+Examples of when to use multiple agents:
+- The task touches multiple parts of the codebase
+- It's a large refactor or architectural change
+- There are many edge cases to consider
+- You'd benefit from exploring different approaches
+
+Example perspectives by task type:
+- New feature: simplicity vs performance vs maintainability
+- Bug fix: root cause vs workaround vs prevention
+- Refactoring: minimal change vs clean architecture
+
+In the agent prompt:
+- Provide comprehensive background context from Phase 1 exploration including filenames and code path traces
+- Describe requirements and constraints
+- Request a detailed implementation plan
+
+### Phase 3: Review
+Goal: Review the plan(s) from Phase 2 and ensure alignment with the user's intentions.
+1. Read the critical files identified by agents to deepen your understanding
+2. Ensure that the plans align with the user's original request
+3. Use question tool to clarify any remaining questions with the user
+
+### Phase 4: Final Plan
+Goal: Write your final plan to the plan file (the only file you can edit).
+- Include only your recommended approach, not all alternatives
+- Ensure that the plan file is concise enough to scan quickly, but detailed enough to execute effectively
+- Include the paths of critical files to be modified
+- Include a verification section describing how to test the changes end-to-end (run the code, use MCP tools, run tests)
- step++
- if (step === 1)
- ensureTitle({
- session,
- modelID: lastUser.model.modelID,
- providerID: lastUser.model.providerID,
- history: msgs,
+### Phase 5: Call plan_exit tool
+At the very end of your turn, once you have asked the user questions and are happy with your final plan file - you should always call plan_exit to indicate to the user that you are done planning.
+This is critical - your turn should only end with either asking the user a question or calling plan_exit. Do not stop unless it's for these 2 reasons.
+
+**Important:** Use question tool to clarify requirements/approach, use plan_exit to request plan approval. Do NOT use question tool to ask "Is this plan okay?" - that's what plan_exit does.
+
+NOTE: At any point in time through this workflow you should feel free to ask the user questions or clarifications. Don't make large assumptions about user intent. The goal is to present a well researched plan to the user, and tie any loose ends before implementation begins.
+</system-reminder>`,
+ synthetic: true,
})
+ userMessage.parts.push(part)
+ return input.messages
+ })
- const model = await Provider.getModel(lastUser.model.providerID, lastUser.model.modelID).catch((e) => {
- if (Provider.ModelNotFoundError.isInstance(e)) {
- const hint = e.data.suggestions?.length ? ` Did you mean: ${e.data.suggestions.join(", ")}?` : ""
- Bus.publish(Session.Event.Error, {
- sessionID,
- error: new NamedError.Unknown({
- message: `Model not found: ${e.data.providerID}/${e.data.modelID}.${hint}`,
- }).toObject(),
+ const resolveTools = Effect.fn("SessionPrompt.resolveTools")(function* (input: {
+ agent: Agent.Info
+ model: Provider.Model
+ session: Session.Info
+ tools?: Record<string, boolean>
+ processor: Pick<SessionProcessor.Handle, "message" | "partFromToolCall">
+ bypassAgentCheck: boolean
+ messages: MessageV2.WithParts[]
+ }) {
+ using _ = log.time("resolveTools")
+ const tools: Record<string, AITool> = {}
+
+ const context = (args: any, options: ToolExecutionOptions): Tool.Context => ({
+ sessionID: input.session.id,
+ abort: options.abortSignal!,
+ messageID: input.processor.message.id,
+ callID: options.toolCallId,
+ extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck },
+ agent: input.agent.name,
+ messages: input.messages,
+ metadata: (val) =>
+ Effect.runPromise(
+ Effect.gen(function* () {
+ const match = input.processor.partFromToolCall(options.toolCallId)
+ if (!match || match.state.status !== "running") return
+ yield* sessions.updatePart({
+ ...match,
+ state: {
+ title: val.title,
+ metadata: val.metadata,
+ status: "running",
+ input: args,
+ time: { start: Date.now() },
+ },
+ })
+ }),
+ ),
+ ask: (req) =>
+ Effect.runPromise(
+ permission.ask({
+ ...req,
+ sessionID: input.session.id,
+ tool: { messageID: input.processor.message.id, callID: options.toolCallId },
+ ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []),
+ }),
+ ),
+ })
+
+ for (const item of yield* registry.tools(
+ { modelID: ModelID.make(input.model.api.id), providerID: input.model.providerID },
+ input.agent,
+ )) {
+ const schema = ProviderTransform.schema(input.model, z.toJSONSchema(item.parameters))
+ tools[item.id] = tool({
+ id: item.id as any,
+ description: item.description,
+ inputSchema: jsonSchema(schema as any),
+ execute(args, options) {
+ return Effect.runPromise(
+ Effect.gen(function* () {
+ const ctx = context(args, options)
+ yield* plugin.trigger(
+ "tool.execute.before",
+ { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
+ { args },
+ )
+ const result = yield* Effect.promise(() => item.execute(args, ctx))
+ const output = {
+ ...result,
+ attachments: result.attachments?.map((attachment) => ({
+ ...attachment,
+ id: PartID.ascending(),
+ sessionID: ctx.sessionID,
+ messageID: input.processor.message.id,
+ })),
+ }
+ yield* plugin.trigger(
+ "tool.execute.after",
+ { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args },
+ output,
+ )
+ return output
+ }),
+ )
+ },
})
}
- throw e
+
+ for (const [key, item] of Object.entries(yield* mcp.tools())) {
+ const execute = item.execute
+ if (!execute) continue
+
+ const schema = yield* Effect.promise(() => Promise.resolve(asSchema(item.inputSchema).jsonSchema))
+ const transformed = ProviderTransform.schema(input.model, schema)
+ item.inputSchema = jsonSchema(transformed)
+ item.execute = (args, opts) =>
+ Effect.runPromise(
+ Effect.gen(function* () {
+ const ctx = context(args, opts)
+ yield* plugin.trigger(
+ "tool.execute.before",
+ { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
+ { args },
+ )
+ yield* Effect.promise(() => ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] }))
+ const result: Awaited<ReturnType<NonNullable<typeof execute>>> = yield* Effect.promise(() =>
+ execute(args, opts),
+ )
+ yield* plugin.trigger(
+ "tool.execute.after",
+ { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId, args },
+ result,
+ )
+
+ const textParts: string[] = []
+ const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
+ for (const contentItem of result.content) {
+ if (contentItem.type === "text") textParts.push(contentItem.text)
+ else if (contentItem.type === "image") {
+ attachments.push({
+ type: "file",
+ mime: contentItem.mimeType,
+ url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
+ })
+ } else if (contentItem.type === "resource") {
+ const { resource } = contentItem
+ if (resource.text) textParts.push(resource.text)
+ if (resource.blob) {
+ attachments.push({
+ type: "file",
+ mime: resource.mimeType ?? "application/octet-stream",
+ url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
+ filename: resource.uri,
+ })
+ }
+ }
+ }
+
+ const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
+ const metadata = {
+ ...(result.metadata ?? {}),
+ truncated: truncated.truncated,
+ ...(truncated.truncated && { outputPath: truncated.outputPath }),
+ }
+
+ return {
+ title: "",
+ metadata,
+ output: truncated.content,
+ attachments: attachments.map((attachment) => ({
+ ...attachment,
+ id: PartID.ascending(),
+ sessionID: ctx.sessionID,
+ messageID: input.processor.message.id,
+ })),
+ content: result.content,
+ }
+ }),
+ )
+ tools[key] = item
+ }
+
+ return tools
})
- const task = tasks.pop()
-
- // pending subtask
- // TODO: centralize "invoke tool" logic
- if (task?.type === "subtask") {
- const taskTool = await TaskTool.init()
- const taskModel = task.model ? await Provider.getModel(task.model.providerID, task.model.modelID) : model
- const assistantMessage = (await Session.updateMessage({
+
+ const handleSubtask = Effect.fn("SessionPrompt.handleSubtask")(function* (input: {
+ task: MessageV2.SubtaskPart
+ model: Provider.Model
+ lastUser: MessageV2.User
+ sessionID: SessionID
+ session: Session.Info
+ msgs: MessageV2.WithParts[]
+ }) {
+ const { task, model, lastUser, sessionID, session, msgs } = input
+ const taskTool = yield* Effect.promise(() => TaskTool.init())
+ const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
+ const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
id: MessageID.ascending(),
role: "assistant",
parentID: lastUser.id,
@@ -370,24 +563,14 @@ export namespace SessionPrompt {
mode: task.agent,
agent: task.agent,
variant: lastUser.variant,
- path: {
- cwd: Instance.directory,
- root: Instance.worktree,
- },
+ path: { cwd: Instance.directory, root: Instance.worktree },
cost: 0,
- tokens: {
- input: 0,
- output: 0,
- reasoning: 0,
- cache: { read: 0, write: 0 },
- },
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: taskModel.id,
providerID: taskModel.providerID,
- time: {
- created: Date.now(),
- },
- })) as MessageV2.Assistant
- let part = (await Session.updatePart({
+ time: { created: Date.now() },
+ })
+ let part: MessageV2.ToolPart = yield* sessions.updatePart({
id: PartID.ascending(),
messageID: assistantMessage.id,
sessionID: assistantMessage.sessionID,
@@ -402,90 +585,104 @@ export namespace SessionPrompt {
subagent_type: task.agent,
command: task.command,
},
- time: {
- start: Date.now(),
- },
+ time: { start: Date.now() },
},
- })) as MessageV2.ToolPart
+ })
const taskArgs = {
prompt: task.prompt,
description: task.description,
subagent_type: task.agent,
command: task.command,
}
- await Plugin.trigger(
- "tool.execute.before",
- {
- tool: "task",
- sessionID,
- callID: part.id,
- },
- { args: taskArgs },
- )
- let executionError: Error | undefined
- const taskAgent = await Agent.get(task.agent)
+ yield* plugin.trigger("tool.execute.before", { tool: "task", sessionID, callID: part.id }, { args: taskArgs })
+
+ const taskAgent = yield* agents.get(task.agent)
if (!taskAgent) {
- const available = await Agent.list().then((agents) => agents.filter((a) => !a.hidden).map((a) => a.name))
+ const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
const error = new NamedError.Unknown({ message: `Agent not found: "${task.agent}".${hint}` })
- Bus.publish(Session.Event.Error, {
- sessionID,
- error: error.toObject(),
- })
+ yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() })
throw error
}
- const taskCtx: Tool.Context = {
- agent: task.agent,
- messageID: assistantMessage.id,
- sessionID: sessionID,
- abort,
- callID: part.callID,
- extra: { bypassAgentCheck: true },
- messages: msgs,
- async metadata(input) {
- part = (await Session.updatePart({
- ...part,
- type: "tool",
- state: {
- ...part.state,
- ...input,
+
+ let error: Error | undefined
+ const result = yield* Effect.promise((signal) =>
+ taskTool
+ .execute(taskArgs, {
+ agent: task.agent,
+ messageID: assistantMessage.id,
+ sessionID,
+ abort: signal,
+ callID: part.callID,
+ extra: { bypassAgentCheck: true },
+ messages: msgs,
+ metadata(val: { title?: string; metadata?: Record<string, any> }) {
+ return Effect.runPromise(
+ Effect.gen(function* () {
+ part = yield* sessions.updatePart({
+ ...part,
+ type: "tool",
+ state: { ...part.state, ...val },
+ } satisfies MessageV2.ToolPart)
+ }),
+ )
+ },
+ ask(req: any) {
+ return Effect.runPromise(
+ permission.ask({
+ ...req,
+ sessionID,
+ ruleset: Permission.merge(taskAgent.permission, session.permission ?? []),
+ }),
+ )
},
- } satisfies MessageV2.ToolPart)) as MessageV2.ToolPart
- },
- async ask(req) {
- await Permission.ask({
- ...req,
- sessionID: sessionID,
- ruleset: Permission.merge(taskAgent.permission, session.permission ?? []),
})
- },
- }
- const result = await taskTool.execute(taskArgs, taskCtx).catch((error) => {
- executionError = error
- log.error("subtask execution failed", { error, agent: task.agent, description: task.description })
- return undefined
- })
+ .catch((e) => {
+ error = e instanceof Error ? e : new Error(String(e))
+ log.error("subtask execution failed", { error, agent: task.agent, description: task.description })
+ return undefined
+ }),
+ ).pipe(
+ Effect.onInterrupt(() =>
+ Effect.gen(function* () {
+ assistantMessage.finish = "tool-calls"
+ assistantMessage.time.completed = Date.now()
+ yield* sessions.updateMessage(assistantMessage)
+ if (part.state.status === "running") {
+ yield* sessions.updatePart({
+ ...part,
+ state: {
+ status: "error",
+ error: "Cancelled",
+ time: { start: part.state.time.start, end: Date.now() },
+ metadata: part.state.metadata,
+ input: part.state.input,
+ },
+ } satisfies MessageV2.ToolPart)
+ }
+ }),
+ ),
+ )
+
const attachments = result?.attachments?.map((attachment) => ({
...attachment,
id: PartID.ascending(),
sessionID,
messageID: assistantMessage.id,
}))
- await Plugin.trigger(
+
+ yield* plugin.trigger(
"tool.execute.after",
- {
- tool: "task",
- sessionID,
- callID: part.id,
- args: taskArgs,
- },
+ { tool: "task", sessionID, callID: part.id, args: taskArgs },
result,
)
+
assistantMessage.finish = "tool-calls"
assistantMessage.time.completed = Date.now()
- await Session.updateMessage(assistantMessage)
+ yield* sessions.updateMessage(assistantMessage)
+
if (result && part.state.status === "running") {
- await Session.updatePart({
+ yield* sessions.updatePart({
...part,
state: {
status: "completed",
@@ -494,714 +691,442 @@ export namespace SessionPrompt {
metadata: result.metadata,
output: result.output,
attachments,
- time: {
- ...part.state.time,
- end: Date.now(),
- },
+ time: { ...part.state.time, end: Date.now() },
},
} satisfies MessageV2.ToolPart)
}
+
if (!result) {
- await Session.updatePart({
+ yield* sessions.updatePart({
...part,
state: {
status: "error",
- error: executionError ? `Tool execution failed: ${executionError.message}` : "Tool execution failed",
+ error: error ? `Tool execution failed: ${error.message}` : "Tool execution failed",
time: {
start: part.state.status === "running" ? part.state.time.start : Date.now(),
end: Date.now(),
},
- metadata: "metadata" in part.state ? part.state.metadata : undefined,
+ metadata: part.state.status === "pending" ? undefined : part.state.metadata,
input: part.state.input,
},
} satisfies MessageV2.ToolPart)
}
- if (task.command) {
- // Add synthetic user message to prevent certain reasoning models from erroring
- // If we create assistant messages w/ out user ones following mid loop thinking signatures
- // will be missing and it can cause errors for models like gemini for example
- const summaryUserMsg: MessageV2.User = {
- id: MessageID.ascending(),
- sessionID,
- role: "user",
- time: {
- created: Date.now(),
- },
- agent: lastUser.agent,
- model: lastUser.model,
- }
- await Session.updateMessage(summaryUserMsg)
- await Session.updatePart({
- id: PartID.ascending(),
- messageID: summaryUserMsg.id,
- sessionID,
- type: "text",
- text: "Summarize the task tool output above and continue with your task.",
- synthetic: true,
- } satisfies MessageV2.TextPart)
- }
-
- continue
- }
-
- // pending compaction
- if (task?.type === "compaction") {
- const result = await SessionCompaction.process({
- messages: msgs,
- parentID: lastUser.id,
- abort,
- sessionID,
- auto: task.auto,
- overflow: task.overflow,
- })
- if (result === "stop") break
- continue
- }
+ if (!task.command) return
- // context overflow, needs compaction
- if (
- lastFinished &&
- lastFinished.summary !== true &&
- (await SessionCompaction.isOverflow({ tokens: lastFinished.tokens, model }))
- ) {
- await SessionCompaction.create({
+ const summaryUserMsg: MessageV2.User = {
+ id: MessageID.ascending(),
sessionID,
+ role: "user",
+ time: { created: Date.now() },
agent: lastUser.agent,
model: lastUser.model,
- auto: true,
- })
- continue
- }
-
- // normal processing
- const agent = await Agent.get(lastUser.agent)
- if (!agent) {
- const available = await Agent.list().then((agents) => agents.filter((a) => !a.hidden).map((a) => a.name))
- const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
- const error = new NamedError.Unknown({ message: `Agent not found: "${lastUser.agent}".${hint}` })
- Bus.publish(Session.Event.Error, {
+ }
+ yield* sessions.updateMessage(summaryUserMsg)
+ yield* sessions.updatePart({
+ id: PartID.ascending(),
+ messageID: summaryUserMsg.id,
sessionID,
- error: error.toObject(),
- })
- throw error
- }
- const maxSteps = agent.steps ?? Infinity
- const isLastStep = step >= maxSteps
- msgs = await insertReminders({
- messages: msgs,
- agent,
- session,
+ type: "text",
+ text: "Summarize the task tool output above and continue with your task.",
+ synthetic: true,
+ } satisfies MessageV2.TextPart)
})
- const processor = await SessionProcessor.create({
- assistantMessage: (await Session.updateMessage({
+ const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) {
+ const session = yield* sessions.get(input.sessionID)
+ if (session.revert) {
+ yield* Effect.promise(() => SessionRevert.cleanup(session))
+ }
+ const agent = yield* agents.get(input.agent)
+ if (!agent) {
+ const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
+ const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
+ const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` })
+ yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() })
+ throw error
+ }
+ const model = input.model ?? agent.model ?? (yield* lastModel(input.sessionID))
+ const userMsg: MessageV2.User = {
id: MessageID.ascending(),
- parentID: lastUser.id,
- role: "assistant",
- mode: agent.name,
- agent: agent.name,
- variant: lastUser.variant,
- path: {
- cwd: Instance.directory,
- root: Instance.worktree,
- },
+ sessionID: input.sessionID,
+ time: { created: Date.now() },
+ role: "user",
+ agent: input.agent,
+ model: { providerID: model.providerID, modelID: model.modelID },
+ }
+ yield* sessions.updateMessage(userMsg)
+ const userPart: MessageV2.Part = {
+ type: "text",
+ id: PartID.ascending(),
+ messageID: userMsg.id,
+ sessionID: input.sessionID,
+ text: "The following tool was executed by the user",
+ synthetic: true,
+ }
+ yield* sessions.updatePart(userPart)
+
+ const msg: MessageV2.Assistant = {
+ id: MessageID.ascending(),
+ sessionID: input.sessionID,
+ parentID: userMsg.id,
+ mode: input.agent,
+ agent: input.agent,
cost: 0,
- tokens: {
- input: 0,
- output: 0,
- reasoning: 0,
- cache: { read: 0, write: 0 },
- },
- modelID: model.id,
+ path: { cwd: Instance.directory, root: Instance.worktree },
+ time: { created: Date.now() },
+ role: "assistant",
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: model.modelID,
providerID: model.providerID,
- time: {
- created: Date.now(),
+ }
+ yield* sessions.updateMessage(msg)
+ const part: MessageV2.ToolPart = {
+ type: "tool",
+ id: PartID.ascending(),
+ messageID: msg.id,
+ sessionID: input.sessionID,
+ tool: "bash",
+ callID: ulid(),
+ state: {
+ status: "running",
+ time: { start: Date.now() },
+ input: { command: input.command },
},
- sessionID,
- })) as MessageV2.Assistant,
- sessionID: sessionID,
- model,
- abort,
- })
- using _ = defer(() => InstructionPrompt.clear(processor.message.id))
-
- // Check if user explicitly invoked an agent via @ in this turn
- const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
- const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
-
- const tools = await resolveTools({
- agent,
- session,
- model,
- tools: lastUser.tools,
- processor,
- bypassAgentCheck,
- messages: msgs,
- })
-
- // Inject StructuredOutput tool if JSON schema mode enabled
- if (lastUser.format?.type === "json_schema") {
- tools["StructuredOutput"] = createStructuredOutputTool({
- schema: lastUser.format.schema,
- onSuccess(output) {
- structuredOutput = output
+ }
+ yield* sessions.updatePart(part)
+
+ const sh = Shell.preferred()
+ const shellName = (
+ process.platform === "win32" ? path.win32.basename(sh, ".exe") : path.basename(sh)
+ ).toLowerCase()
+ const invocations: Record<string, { args: string[] }> = {
+ nu: { args: ["-c", input.command] },
+ fish: { args: ["-c", input.command] },
+ zsh: {
+ args: [
+ "-c",
+ "-l",
+ `
+ [[ -f ~/.zshenv ]] && source ~/.zshenv >/dev/null 2>&1 || true
+ [[ -f "\${ZDOTDIR:-$HOME}/.zshrc" ]] && source "\${ZDOTDIR:-$HOME}/.zshrc" >/dev/null 2>&1 || true
+ eval ${JSON.stringify(input.command)}
+ `,
+ ],
},
- })
- }
-
- if (step === 1) {
- SessionSummary.summarize({
- sessionID: sessionID,
- messageID: lastUser.id,
- })
- }
-
- // Ephemerally wrap queued user messages with a reminder to stay on track
- if (step > 1 && lastFinished) {
- for (const msg of msgs) {
- if (msg.info.role !== "user" || msg.info.id <= lastFinished.id) continue
- for (const part of msg.parts) {
- if (part.type !== "text" || part.ignored || part.synthetic) continue
- if (!part.text.trim()) continue
- part.text = [
- "<system-reminder>",
- "The user sent the following message:",
- part.text,
- "",
- "Please address this message and continue with your tasks.",
- "</system-reminder>",
- ].join("\n")
- }
+ bash: {
+ args: [
+ "-c",
+ "-l",
+ `
+ shopt -s expand_aliases
+ [[ -f ~/.bashrc ]] && source ~/.bashrc >/dev/null 2>&1 || true
+ eval ${JSON.stringify(input.command)}
+ `,
+ ],
+ },
+ cmd: { args: ["/c", input.command] },
+ powershell: { args: ["-NoProfile", "-Command", input.command] },
+ pwsh: { args: ["-NoProfile", "-Command", input.command] },
+ "": { args: ["-c", `${input.command}`] },
}
- }
-
- await Plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
-
- // Build system prompt, adding structured output instruction if needed
- const skills = await SystemPrompt.skills(agent)
- const system = [
- ...(await SystemPrompt.environment(model)),
- ...(skills ? [skills] : []),
- ...(await InstructionPrompt.system()),
- ]
- const format = lastUser.format ?? { type: "text" }
- if (format.type === "json_schema") {
- system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
- }
-
- const result = await processor.process({
- user: lastUser,
- agent,
- permission: session.permission,
- abort,
- sessionID,
- system,
- messages: [
- ...(await MessageV2.toModelMessages(msgs, model)),
- ...(isLastStep
- ? [
- {
- role: "assistant" as const,
- content: MAX_STEPS,
- },
- ]
- : []),
- ],
- tools,
- model,
- toolChoice: format.type === "json_schema" ? "required" : undefined,
- })
- // If structured output was captured, save it and exit immediately
- // This takes priority because the StructuredOutput tool was called successfully
- if (structuredOutput !== undefined) {
- processor.message.structured = structuredOutput
- processor.message.finish = processor.message.finish ?? "stop"
- await Session.updateMessage(processor.message)
- break
- }
+ const args = (invocations[shellName] ?? invocations[""]).args
+ const cwd = Instance.directory
+ const shellEnv = yield* plugin.trigger(
+ "shell.env",
+ { cwd, sessionID: input.sessionID, callID: part.callID },
+ { env: {} },
+ )
+ const proc = yield* Effect.sync(() =>
+ spawn(sh, args, {
+ cwd,
+ detached: process.platform !== "win32",
+ windowsHide: process.platform === "win32",
+ stdio: ["ignore", "pipe", "pipe"],
+ env: {
+ ...process.env,
+ ...shellEnv.env,
+ TERM: "dumb",
+ },
+ }),
+ )
- // Check if model finished (finish reason is not "tool-calls" or "unknown")
- const modelFinished = processor.message.finish && !["tool-calls", "unknown"].includes(processor.message.finish)
-
- if (modelFinished && !processor.message.error) {
- if (format.type === "json_schema") {
- // Model stopped without calling StructuredOutput tool
- processor.message.error = new MessageV2.StructuredOutputError({
- message: "Model did not produce structured output",
- retries: 0,
- }).toObject()
- await Session.updateMessage(processor.message)
- break
+ let output = ""
+ const write = () => {
+ if (part.state.status !== "running") return
+ part.state.metadata = { output, description: "" }
+ void Effect.runFork(sessions.updatePart(part))
}
- }
- if (result === "stop") break
- if (result === "compact") {
- await SessionCompaction.create({
- sessionID,
- agent: lastUser.agent,
- model: lastUser.model,
- auto: true,
- overflow: !processor.message.finish,
+ proc.stdout?.on("data", (chunk) => {
+ output += chunk.toString()
+ write()
})
- }
- continue
- }
- SessionCompaction.prune({ sessionID })
- for await (const item of MessageV2.stream(sessionID)) {
- if (item.info.role === "user") continue
- const queued = state()[sessionID]?.callbacks ?? []
- for (const q of queued) {
- q.resolve(item)
- }
- return item
- }
- throw new Error("Impossible")
- })
-
- async function lastModel(sessionID: SessionID) {
- for await (const item of MessageV2.stream(sessionID)) {
- if (item.info.role === "user" && item.info.model) return item.info.model
- }
- return Provider.defaultModel()
- }
-
- /** @internal Exported for testing */
- export async function resolveTools(input: {
- agent: Agent.Info
- model: Provider.Model
- session: Session.Info
- tools?: Record<string, boolean>
- processor: SessionProcessor.Info
- bypassAgentCheck: boolean
- messages: MessageV2.WithParts[]
- }) {
- using _ = log.time("resolveTools")
- const tools: Record<string, AITool> = {}
-
- const context = (args: any, options: ToolExecutionOptions): Tool.Context => ({
- sessionID: input.session.id,
- abort: options.abortSignal!,
- messageID: input.processor.message.id,
- callID: options.toolCallId,
- extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck },
- agent: input.agent.name,
- messages: input.messages,
- metadata: async (val: { title?: string; metadata?: any }) => {
- const match = input.processor.partFromToolCall(options.toolCallId)
- if (match && match.state.status === "running") {
- await Session.updatePart({
- ...match,
- state: {
- title: val.title,
- metadata: val.metadata,
- status: "running",
- input: args,
- time: {
- start: Date.now(),
- },
- },
- })
- }
- },
- async ask(req) {
- await Permission.ask({
- ...req,
- sessionID: input.session.id,
- tool: { messageID: input.processor.message.id, callID: options.toolCallId },
- ruleset: Permission.merge(input.agent.permission, input.session.permission ?? []),
+ proc.stderr?.on("data", (chunk) => {
+ output += chunk.toString()
+ write()
})
- },
- })
-
- for (const item of await ToolRegistry.tools(
- { modelID: ModelID.make(input.model.api.id), providerID: input.model.providerID },
- input.agent,
- )) {
- const schema = ProviderTransform.schema(input.model, z.toJSONSchema(item.parameters))
- tools[item.id] = tool({
- id: item.id as any,
- description: item.description,
- inputSchema: jsonSchema(schema as any),
- async execute(args, options) {
- const ctx = context(args, options)
- await Plugin.trigger(
- "tool.execute.before",
- {
- tool: item.id,
- sessionID: ctx.sessionID,
- callID: ctx.callID,
- },
- {
- args,
- },
- )
- const result = await item.execute(args, ctx)
- const output = {
- ...result,
- attachments: result.attachments?.map((attachment) => ({
- ...attachment,
- id: PartID.ascending(),
- sessionID: ctx.sessionID,
- messageID: input.processor.message.id,
- })),
- }
- await Plugin.trigger(
- "tool.execute.after",
- {
- tool: item.id,
- sessionID: ctx.sessionID,
- callID: ctx.callID,
- args,
- },
- output,
- )
- return output
- },
- })
- }
- for (const [key, item] of Object.entries(await MCP.tools())) {
- const execute = item.execute
- if (!execute) continue
+ let aborted = false
+ let exited = false
+ let finished = false
+ const kill = Effect.promise(() => Shell.killTree(proc, { exited: () => exited }))
- const schema = await asSchema(item.inputSchema).jsonSchema
- const transformed = ProviderTransform.schema(input.model, schema)
- item.inputSchema = jsonSchema(transformed)
- // Wrap execute to add plugin hooks and format output
- item.execute = async (args, opts) => {
- const ctx = context(args, opts)
+ const abortHandler = () => {
+ if (aborted) return
+ aborted = true
+ void Effect.runFork(kill)
+ }
- await Plugin.trigger(
- "tool.execute.before",
- {
- tool: key,
- sessionID: ctx.sessionID,
- callID: opts.toolCallId,
- },
- {
- args,
- },
+ const finish = Effect.uninterruptible(
+ Effect.gen(function* () {
+ if (finished) return
+ finished = true
+ if (aborted) {
+ output += "\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n")
+ }
+ if (!msg.time.completed) {
+ msg.time.completed = Date.now()
+ yield* sessions.updateMessage(msg)
+ }
+ if (part.state.status === "running") {
+ part.state = {
+ status: "completed",
+ time: { ...part.state.time, end: Date.now() },
+ input: part.state.input,
+ title: "",
+ metadata: { output, description: "" },
+ output,
+ }
+ yield* sessions.updatePart(part)
+ }
+ }),
)
- await ctx.ask({
- permission: key,
- metadata: {},
- patterns: ["*"],
- always: ["*"],
- })
-
- const result = await execute(args, opts)
-
- await Plugin.trigger(
- "tool.execute.after",
- {
- tool: key,
- sessionID: ctx.sessionID,
- callID: opts.toolCallId,
- args,
- },
- result,
+ const exit = yield* Effect.promise(() => {
+ signal.addEventListener("abort", abortHandler, { once: true })
+ if (signal.aborted) abortHandler()
+ return new Promise<void>((resolve) => {
+ const close = () => {
+ exited = true
+ proc.off("close", close)
+ resolve()
+ }
+ proc.once("close", close)
+ })
+ }).pipe(
+ Effect.onInterrupt(() => Effect.sync(abortHandler)),
+ Effect.ensuring(Effect.sync(() => signal.removeEventListener("abort", abortHandler))),
+ Effect.ensuring(finish),
+ Effect.exit,
)
- const textParts: string[] = []
- const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
+ if (Exit.isFailure(exit) && !Cause.hasInterruptsOnly(exit.cause)) {
+ return yield* Effect.failCause(exit.cause)
+ }
- for (const contentItem of result.content) {
- if (contentItem.type === "text") {
- textParts.push(contentItem.text)
- } else if (contentItem.type === "image") {
- attachments.push({
- type: "file",
- mime: contentItem.mimeType,
- url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
- })
- } else if (contentItem.type === "resource") {
- const { resource } = contentItem
- if (resource.text) {
- textParts.push(resource.text)
- }
- if (resource.blob) {
- attachments.push({
- type: "file",
- mime: resource.mimeType ?? "application/octet-stream",
- url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
- filename: resource.uri,
+ return { info: msg, parts: [part] }
+ })
+
+ const getModel = (providerID: ProviderID, modelID: ModelID, sessionID: SessionID) =>
+ Effect.promise(() =>
+ Provider.getModel(providerID, modelID).catch((e) => {
+ if (Provider.ModelNotFoundError.isInstance(e)) {
+ const hint = e.data.suggestions?.length ? ` Did you mean: ${e.data.suggestions.join(", ")}?` : ""
+ Bus.publish(Session.Event.Error, {
+ sessionID,
+ error: new NamedError.Unknown({
+ message: `Model not found: ${e.data.providerID}/${e.data.modelID}.${hint}`,
+ }).toObject(),
})
}
- }
- }
-
- const truncated = await Truncate.output(textParts.join("\n\n"), {}, input.agent)
- const metadata = {
- ...(result.metadata ?? {}),
- truncated: truncated.truncated,
- ...(truncated.truncated && { outputPath: truncated.outputPath }),
- }
+ throw e
+ }),
+ )
- return {
- title: "",
- metadata,
- output: truncated.content,
- attachments: attachments.map((attachment) => ({
- ...attachment,
- id: PartID.ascending(),
- sessionID: ctx.sessionID,
- messageID: input.processor.message.id,
- })),
- content: result.content, // directly return content to preserve ordering when outputting to model
+ const createUserMessage = Effect.fn("SessionPrompt.createUserMessage")(function* (input: PromptInput) {
+ const agentName = input.agent || (yield* agents.defaultAgent())
+ const ag = yield* agents.get(agentName)
+ if (!ag) {
+ const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
+ const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
+ const error = new NamedError.Unknown({ message: `Agent not found: "${agentName}".${hint}` })
+ yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() })
+ throw error
}
- }
- tools[key] = item
- }
- return tools
- }
-
- /** @internal Exported for testing */
- export function createStructuredOutputTool(input: {
- schema: Record<string, any>
- onSuccess: (output: unknown) => void
- }): AITool {
- // Remove $schema property if present (not needed for tool input)
- const { $schema, ...toolSchema } = input.schema
+ const model = input.model ?? ag.model ?? (yield* lastModel(input.sessionID))
+ const full =
+ !input.variant && ag.variant
+ ? yield* Effect.promise(() => Provider.getModel(model.providerID, model.modelID).catch(() => undefined))
+ : undefined
+ const variant = input.variant ?? (ag.variant && full?.variants?.[ag.variant] ? ag.variant : undefined)
- return tool({
- id: "StructuredOutput" as any,
- description: STRUCTURED_OUTPUT_DESCRIPTION,
- inputSchema: jsonSchema(toolSchema as any),
- async execute(args) {
- // AI SDK validates args against inputSchema before calling execute()
- input.onSuccess(args)
- return {
- output: "Structured output captured successfully.",
- title: "Structured Output",
- metadata: { valid: true },
- }
- },
- toModelOutput({ output }) {
- return {
- type: "text",
- value: output.output,
+ const info: MessageV2.Info = {
+ id: input.messageID ?? MessageID.ascending(),
+ role: "user",
+ sessionID: input.sessionID,
+ time: { created: Date.now() },
+ tools: input.tools,
+ agent: ag.name,
+ model,
+ system: input.system,
+ format: input.format,
+ variant,
}
- },
- })
- }
- async function createUserMessage(input: PromptInput) {
- const agentName = input.agent || (await Agent.defaultAgent())
- const agent = await Agent.get(agentName)
- if (!agent) {
- const available = await Agent.list().then((agents) => agents.filter((a) => !a.hidden).map((a) => a.name))
- const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
- const error = new NamedError.Unknown({ message: `Agent not found: "${agentName}".${hint}` })
- Bus.publish(Session.Event.Error, {
- sessionID: input.sessionID,
- error: error.toObject(),
- })
- throw error
- }
-
- const model = input.model ?? agent.model ?? (await lastModel(input.sessionID))
- const full =
- !input.variant && agent.variant
- ? await Provider.getModel(model.providerID, model.modelID).catch(() => undefined)
- : undefined
- const variant = input.variant ?? (agent.variant && full?.variants?.[agent.variant] ? agent.variant : undefined)
-
- const info: MessageV2.Info = {
- id: input.messageID ?? MessageID.ascending(),
- role: "user",
- sessionID: input.sessionID,
- time: {
- created: Date.now(),
- },
- tools: input.tools,
- agent: agent.name,
- model,
- system: input.system,
- format: input.format,
- variant,
- }
- using _ = defer(() => InstructionPrompt.clear(info.id))
-
- type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
- const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
- ...part,
- id: part.id ? PartID.make(part.id) : PartID.ascending(),
- })
-
- const parts = await Promise.all(
- input.parts.map(async (part): Promise<Draft<MessageV2.Part>[]> => {
- if (part.type === "file") {
- // before checking the protocol we check if this is an mcp resource because it needs special handling
- if (part.source?.type === "resource") {
- const { clientName, uri } = part.source
- log.info("mcp resource", { clientName, uri, mime: part.mime })
-
- const pieces: Draft<MessageV2.Part>[] = [
- {
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: `Reading MCP resource: ${part.filename} (${uri})`,
- },
- ]
+ yield* Effect.addFinalizer(() => Effect.sync(() => InstructionPrompt.clear(info.id)))
- try {
- const resourceContent = await MCP.readResource(clientName, uri)
- if (!resourceContent) {
- throw new Error(`Resource not found: ${clientName}/${uri}`)
- }
-
- // Handle different content types
- const contents = Array.isArray(resourceContent.contents)
- ? resourceContent.contents
- : [resourceContent.contents]
+ type Draft<T> = T extends MessageV2.Part ? Omit<T, "id"> & { id?: string } : never
+ const assign = (part: Draft<MessageV2.Part>): MessageV2.Part => ({
+ ...part,
+ id: part.id ? PartID.make(part.id) : PartID.ascending(),
+ })
- for (const content of contents) {
- if ("text" in content && content.text) {
- pieces.push({
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: content.text as string,
- })
- } else if ("blob" in content && content.blob) {
- // Handle binary content if needed
- const mimeType = "mimeType" in content ? content.mimeType : part.mime
- pieces.push({
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: `[Binary content: ${mimeType}]`,
- })
+ const resolvePart: (part: PromptInput["parts"][number]) => Effect.Effect<Draft<MessageV2.Part>[]> = Effect.fn(
+ "SessionPrompt.resolveUserPart",
+ )(function* (part) {
+ if (part.type === "file") {
+ if (part.source?.type === "resource") {
+ const { clientName, uri } = part.source
+ log.info("mcp resource", { clientName, uri, mime: part.mime })
+ const pieces: Draft<MessageV2.Part>[] = [
+ {
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: `Reading MCP resource: ${part.filename} (${uri})`,
+ },
+ ]
+ const exit = yield* mcp.readResource(clientName, uri).pipe(Effect.exit)
+ if (Exit.isSuccess(exit)) {
+ const content = exit.value
+ if (!content) throw new Error(`Resource not found: ${clientName}/${uri}`)
+ const items = Array.isArray(content.contents) ? content.contents : [content.contents]
+ for (const c of items) {
+ if ("text" in c && c.text) {
+ pieces.push({
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: c.text,
+ })
+ } else if ("blob" in c && c.blob) {
+ const mime = "mimeType" in c ? c.mimeType : part.mime
+ pieces.push({
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: `[Binary content: ${mime}]`,
+ })
+ }
}
+ pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
+ } else {
+ const error = Cause.squash(exit.cause)
+ log.error("failed to read MCP resource", { error, clientName, uri })
+ const message = error instanceof Error ? error.message : String(error)
+ pieces.push({
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: `Failed to read MCP resource ${part.filename}: ${message}`,
+ })
}
-
- pieces.push({
- ...part,
- messageID: info.id,
- sessionID: input.sessionID,
- })
- } catch (error: unknown) {
- log.error("failed to read MCP resource", { error, clientName, uri })
- const message = error instanceof Error ? error.message : String(error)
- pieces.push({
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: `Failed to read MCP resource ${part.filename}: ${message}`,
- })
+ return pieces
}
-
- return pieces
- }
- const url = new URL(part.url)
- switch (url.protocol) {
- case "data:":
- if (part.mime === "text/plain") {
- return [
- {
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: `Called the Read tool with the following input: ${JSON.stringify({ filePath: part.filename })}`,
- },
- {
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: decodeDataUrl(part.url),
- },
- {
- ...part,
- messageID: info.id,
- sessionID: input.sessionID,
- },
- ]
- }
- break
- case "file:":
- log.info("file", { mime: part.mime })
- // have to normalize, symbol search returns absolute paths
- // Decode the pathname since URL constructor doesn't automatically decode it
- const filepath = fileURLToPath(part.url)
- const s = Filesystem.stat(filepath)
-
- if (s?.isDirectory()) {
- part.mime = "application/x-directory"
- }
-
- if (part.mime === "text/plain") {
- let offset: number | undefined = undefined
- let limit: number | undefined = undefined
- const range = {
- start: url.searchParams.get("start"),
- end: url.searchParams.get("end"),
+ const url = new URL(part.url)
+ switch (url.protocol) {
+ case "data:":
+ if (part.mime === "text/plain") {
+ return [
+ {
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: `Called the Read tool with the following input: ${JSON.stringify({ filePath: part.filename })}`,
+ },
+ {
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: decodeDataUrl(part.url),
+ },
+ { ...part, messageID: info.id, sessionID: input.sessionID },
+ ]
}
- if (range.start != null) {
- const filePathURI = part.url.split("?")[0]
- let start = parseInt(range.start)
- let end = range.end ? parseInt(range.end) : undefined
- // some LSP servers (eg, gopls) don't give full range in
- // workspace/symbol searches, so we'll try to find the
- // symbol in the document to get the full range
- if (start === end) {
- const symbols = await LSP.documentSymbol(filePathURI).catch(() => [])
- for (const symbol of symbols) {
- let range: LSP.Range | undefined
- if ("range" in symbol) {
- range = symbol.range
- } else if ("location" in symbol) {
- range = symbol.location.range
- }
- if (range?.start?.line && range?.start?.line === start) {
- start = range.start.line
- end = range?.end?.line ?? start
- break
+ break
+ case "file:": {
+ log.info("file", { mime: part.mime })
+ const filepath = fileURLToPath(part.url)
+ if (yield* fsys.isDir(filepath)) part.mime = "application/x-directory"
+
+ if (part.mime === "text/plain") {
+ let offset: number | undefined
+ let limit: number | undefined
+ const range = { start: url.searchParams.get("start"), end: url.searchParams.get("end") }
+ if (range.start != null) {
+ const filePathURI = part.url.split("?")[0]
+ let start = parseInt(range.start)
+ let end = range.end ? parseInt(range.end) : undefined
+ if (start === end) {
+ const symbols = yield* lsp
+ .documentSymbol(filePathURI)
+ .pipe(Effect.catch(() => Effect.succeed([])))
+ for (const symbol of symbols) {
+ let r: LSP.Range | undefined
+ if ("range" in symbol) r = symbol.range
+ else if ("location" in symbol) r = symbol.location.range
+ if (r?.start?.line && r?.start?.line === start) {
+ start = r.start.line
+ end = r?.end?.line ?? start
+ break
+ }
}
}
+ offset = Math.max(start, 1)
+ if (end) limit = end - (offset - 1)
}
- offset = Math.max(start, 1)
- if (end) {
- limit = end - (offset - 1)
- }
- }
- const args = { filePath: filepath, offset, limit }
-
- const pieces: Draft<MessageV2.Part>[] = [
- {
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
- },
- ]
-
- await ReadTool.init()
- .then(async (t) => {
- const model = await Provider.getModel(info.model.providerID, info.model.modelID)
- const readCtx: Tool.Context = {
- sessionID: input.sessionID,
- abort: new AbortController().signal,
- agent: input.agent!,
+ const args = { filePath: filepath, offset, limit }
+ const pieces: Draft<MessageV2.Part>[] = [
+ {
messageID: info.id,
- extra: { bypassCwdCheck: true, model },
- messages: [],
- metadata: async () => {},
- ask: async () => {},
- }
- const result = await t.execute(args, readCtx)
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
+ },
+ ]
+ const read = yield* Effect.promise(() => ReadTool.init()).pipe(
+ Effect.flatMap((t) =>
+ Effect.promise(() => Provider.getModel(info.model.providerID, info.model.modelID)).pipe(
+ Effect.flatMap((mdl) =>
+ Effect.promise(() =>
+ t.execute(args, {
+ sessionID: input.sessionID,
+ abort: new AbortController().signal,
+ agent: input.agent!,
+ messageID: info.id,
+ extra: { bypassCwdCheck: true, model: mdl },
+ messages: [],
+ metadata: async () => {},
+ ask: async () => {},
+ }),
+ ),
+ ),
+ ),
+ ),
+ Effect.exit,
+ )
+ if (Exit.isSuccess(read)) {
+ const result = read.value
pieces.push({
messageID: info.id,
sessionID: input.sessionID,
@@ -1211,30 +1136,24 @@ export namespace SessionPrompt {
})
if (result.attachments?.length) {
pieces.push(
- ...result.attachments.map((attachment) => ({
- ...attachment,
+ ...result.attachments.map((a) => ({
+ ...a,
synthetic: true,
- filename: attachment.filename ?? part.filename,
+ filename: a.filename ?? part.filename,
messageID: info.id,
sessionID: input.sessionID,
})),
)
} else {
- pieces.push({
- ...part,
- messageID: info.id,
- sessionID: input.sessionID,
- })
+ pieces.push({ ...part, messageID: info.id, sessionID: input.sessionID })
}
- })
- .catch((error) => {
+ } else {
+ const error = Cause.squash(read.cause)
log.error("failed to read file", { error })
- const message = error instanceof Error ? error.message : error.toString()
- Bus.publish(Session.Event.Error, {
+ const message = error instanceof Error ? error.message : String(error)
+ yield* bus.publish(Session.Event.Error, {
sessionID: input.sessionID,
- error: new NamedError.Unknown({
- message,
- }).toObject(),
+ error: new NamedError.Unknown({ message }).toObject(),
})
pieces.push({
messageID: info.id,
@@ -1243,553 +1162,667 @@ export namespace SessionPrompt {
synthetic: true,
text: `Read tool failed to read ${filepath} with the following error: ${message}`,
})
- })
-
- return pieces
- }
+ }
+ return pieces
+ }
- if (part.mime === "application/x-directory") {
- const args = { filePath: filepath }
- const listCtx: Tool.Context = {
- sessionID: input.sessionID,
- abort: new AbortController().signal,
- agent: input.agent!,
- messageID: info.id,
- extra: { bypassCwdCheck: true },
- messages: [],
- metadata: async () => {},
- ask: async () => {},
+ if (part.mime === "application/x-directory") {
+ const args = { filePath: filepath }
+ const result = yield* Effect.promise(() => ReadTool.init()).pipe(
+ Effect.flatMap((t) =>
+ Effect.promise(() =>
+ t.execute(args, {
+ sessionID: input.sessionID,
+ abort: new AbortController().signal,
+ agent: input.agent!,
+ messageID: info.id,
+ extra: { bypassCwdCheck: true },
+ messages: [],
+ metadata: async () => {},
+ ask: async () => {},
+ }),
+ ),
+ ),
+ )
+ return [
+ {
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
+ },
+ {
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text: result.output,
+ },
+ { ...part, messageID: info.id, sessionID: input.sessionID },
+ ]
}
- const result = await ReadTool.init().then((t) => t.execute(args, listCtx))
+
+ yield* filetime.read(input.sessionID, filepath)
return [
{
messageID: info.id,
sessionID: input.sessionID,
type: "text",
synthetic: true,
- text: `Called the Read tool with the following input: ${JSON.stringify(args)}`,
+ text: `Called the Read tool with the following input: {"filePath":"${filepath}"}`,
},
{
+ id: part.id,
messageID: info.id,
sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- text: result.output,
- },
- {
- ...part,
- messageID: info.id,
- sessionID: input.sessionID,
+ type: "file",
+ url:
+ `data:${part.mime};base64,` +
+ Buffer.from(yield* fsys.readFile(filepath).pipe(Effect.catch(Effect.die))).toString("base64"),
+ mime: part.mime,
+ filename: part.filename!,
+ source: part.source,
},
]
}
+ }
+ }
- await FileTime.read(input.sessionID, filepath)
- return [
- {
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- text: `Called the Read tool with the following input: {"filePath":"${filepath}"}`,
- synthetic: true,
- },
- {
- id: part.id,
- messageID: info.id,
- sessionID: input.sessionID,
- type: "file",
- url: `data:${part.mime};base64,` + (await Filesystem.readBytes(filepath)).toString("base64"),
- mime: part.mime,
- filename: part.filename!,
- source: part.source,
- },
- ]
+ if (part.type === "agent") {
+ const perm = Permission.evaluate("task", part.name, ag.permission)
+ const hint = perm.action === "deny" ? " . Invoked by user; guaranteed to exist." : ""
+ return [
+ { ...part, messageID: info.id, sessionID: input.sessionID },
+ {
+ messageID: info.id,
+ sessionID: input.sessionID,
+ type: "text",
+ synthetic: true,
+ text:
+ " Use the above message and context to generate a prompt and call the task tool with subagent: " +
+ part.name +
+ hint,
+ },
+ ]
}
- }
- if (part.type === "agent") {
- // Check if this agent would be denied by task permission
- const perm = Permission.evaluate("task", part.name, agent.permission)
- const hint = perm.action === "deny" ? " . Invoked by user; guaranteed to exist." : ""
- return [
- {
- ...part,
- messageID: info.id,
- sessionID: input.sessionID,
- },
- {
- messageID: info.id,
- sessionID: input.sessionID,
- type: "text",
- synthetic: true,
- // An extra space is added here. Otherwise the 'Use' gets appended
- // to user's last word; making a combined word
- text:
- " Use the above message and context to generate a prompt and call the task tool with subagent: " +
- part.name +
- hint,
- },
- ]
- }
+ return [{ ...part, messageID: info.id, sessionID: input.sessionID }]
+ })
+
+ const parts = yield* Effect.forEach(input.parts, resolvePart, { concurrency: "unbounded" }).pipe(
+ Effect.map((x) => x.flat().map(assign)),
+ )
- return [
+ yield* plugin.trigger(
+ "chat.message",
{
- ...part,
- messageID: info.id,
sessionID: input.sessionID,
+ agent: input.agent,
+ model: input.model,
+ messageID: input.messageID,
+ variant: input.variant,
},
- ]
- }),
- ).then((x) => x.flat().map(assign))
-
- await Plugin.trigger(
- "chat.message",
- {
- sessionID: input.sessionID,
- agent: input.agent,
- model: input.model,
- messageID: input.messageID,
- variant: input.variant,
- },
- {
- message: info,
- parts,
- },
- )
-
- const parsedInfo = MessageV2.Info.safeParse(info)
- if (!parsedInfo.success) {
- log.error("invalid user message before save", {
- sessionID: input.sessionID,
- messageID: info.id,
- agent: info.agent,
- model: info.model,
- issues: parsedInfo.error.issues,
- })
- }
-
- parts.forEach((part, index) => {
- const parsedPart = MessageV2.Part.safeParse(part)
- if (parsedPart.success) return
- log.error("invalid user part before save", {
- sessionID: input.sessionID,
- messageID: info.id,
- partID: part.id,
- partType: part.type,
- index,
- issues: parsedPart.error.issues,
- part,
- })
- })
+ { message: info, parts },
+ )
- await Session.updateMessage(info)
- for (const part of parts) {
- await Session.updatePart(part)
- }
+ const parsed = MessageV2.Info.safeParse(info)
+ if (!parsed.success) {
+ log.error("invalid user message before save", {
+ sessionID: input.sessionID,
+ messageID: info.id,
+ agent: info.agent,
+ model: info.model,
+ issues: parsed.error.issues,
+ })
+ }
+ parts.forEach((part, index) => {
+ const p = MessageV2.Part.safeParse(part)
+ if (p.success) return
+ log.error("invalid user part before save", {
+ sessionID: input.sessionID,
+ messageID: info.id,
+ partID: part.id,
+ partType: part.type,
+ index,
+ issues: p.error.issues,
+ part,
+ })
+ })
- return {
- info,
- parts,
- }
- }
+ yield* sessions.updateMessage(info)
+ for (const part of parts) yield* sessions.updatePart(part)
- async function insertReminders(input: { messages: MessageV2.WithParts[]; agent: Agent.Info; session: Session.Info }) {
- const userMessage = input.messages.findLast((msg) => msg.info.role === "user")
- if (!userMessage) return input.messages
+ return { info, parts }
+ }, Effect.scoped)
- // Original logic when experimental plan mode is disabled
- if (!Flag.OPENCODE_EXPERIMENTAL_PLAN_MODE) {
- if (input.agent.name === "plan") {
- userMessage.parts.push({
- id: PartID.ascending(),
- messageID: userMessage.info.id,
- sessionID: userMessage.info.sessionID,
- type: "text",
- text: PROMPT_PLAN,
- synthetic: true,
- })
- }
- const wasPlan = input.messages.some((msg) => msg.info.role === "assistant" && msg.info.agent === "plan")
- if (wasPlan && input.agent.name === "build") {
- userMessage.parts.push({
- id: PartID.ascending(),
- messageID: userMessage.info.id,
- sessionID: userMessage.info.sessionID,
- type: "text",
- text: BUILD_SWITCH,
- synthetic: true,
- })
- }
- return input.messages
- }
-
- // New plan mode logic when flag is enabled
- const assistantMessage = input.messages.findLast((msg) => msg.info.role === "assistant")
-
- // Switching from plan mode to build mode
- if (input.agent.name !== "plan" && assistantMessage?.info.agent === "plan") {
- const plan = Session.plan(input.session)
- const exists = await Filesystem.exists(plan)
- if (exists) {
- const part = await Session.updatePart({
- id: PartID.ascending(),
- messageID: userMessage.info.id,
- sessionID: userMessage.info.sessionID,
- type: "text",
- text:
- BUILD_SWITCH + "\n\n" + `A plan file exists at ${plan}. You should execute on the plan defined within it`,
- synthetic: true,
+ const prompt: (input: PromptInput) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.prompt")(
+ function* (input: PromptInput) {
+ const session = yield* sessions.get(input.sessionID)
+ yield* Effect.promise(() => SessionRevert.cleanup(session))
+ const message = yield* createUserMessage(input)
+ yield* sessions.touch(input.sessionID)
+
+ const permissions: Permission.Ruleset = []
+ for (const [t, enabled] of Object.entries(input.tools ?? {})) {
+ permissions.push({ permission: t, action: enabled ? "allow" : "deny", pattern: "*" })
+ }
+ if (permissions.length > 0) {
+ session.permission = permissions
+ yield* sessions.setPermission({ sessionID: session.id, permission: permissions })
+ }
+
+ if (input.noReply === true) return message
+ return yield* loop({ sessionID: input.sessionID })
+ },
+ )
+
+ const lastAssistant = (sessionID: SessionID) =>
+ Effect.promise(async () => {
+ let latest: MessageV2.WithParts | undefined
+ for await (const item of MessageV2.stream(sessionID)) {
+ latest ??= item
+ if (item.info.role !== "user") return item
+ }
+ if (latest) return latest
+ throw new Error("Impossible")
})
- userMessage.parts.push(part)
- }
- return input.messages
- }
-
- // Entering plan mode
- if (input.agent.name === "plan" && assistantMessage?.info.agent !== "plan") {
- const plan = Session.plan(input.session)
- const exists = await Filesystem.exists(plan)
- if (!exists) await fs.mkdir(path.dirname(plan), { recursive: true })
- const part = await Session.updatePart({
- id: PartID.ascending(),
- messageID: userMessage.info.id,
- sessionID: userMessage.info.sessionID,
- type: "text",
- text: `<system-reminder>
-Plan mode is active. The user indicated that they do not want you to execute yet -- you MUST NOT make any edits (with the exception of the plan file mentioned below), run any non-readonly tools (including changing configs or making commits), or otherwise make any changes to the system. This supersedes any other instructions you have received.
-## Plan File Info:
-${exists ? `A plan file already exists at ${plan}. You can read it and make incremental edits using the edit tool.` : `No plan file exists yet. You should create your plan at ${plan} using the write tool.`}
-You should build your plan incrementally by writing to or editing this file. NOTE that this is the only file you are allowed to edit - other than this you are only allowed to take READ-ONLY actions.
+ const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
+ function* (sessionID: SessionID) {
+ let structured: unknown | undefined
+ let step = 0
+ const session = yield* sessions.get(sessionID)
+
+ while (true) {
+ yield* status.set(sessionID, { type: "busy" })
+ log.info("loop", { step, sessionID })
+
+ let msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(sessionID)))
+
+ let lastUser: MessageV2.User | undefined
+ let lastAssistant: MessageV2.Assistant | undefined
+ let lastFinished: MessageV2.Assistant | undefined
+ let tasks: (MessageV2.CompactionPart | MessageV2.SubtaskPart)[] = []
+ for (let i = msgs.length - 1; i >= 0; i--) {
+ const msg = msgs[i]
+ if (!lastUser && msg.info.role === "user") lastUser = msg.info
+ if (!lastAssistant && msg.info.role === "assistant") lastAssistant = msg.info
+ if (!lastFinished && msg.info.role === "assistant" && msg.info.finish) lastFinished = msg.info
+ if (lastUser && lastFinished) break
+ const task = msg.parts.filter((part) => part.type === "compaction" || part.type === "subtask")
+ if (task && !lastFinished) tasks.push(...task)
+ }
-## Plan Workflow
+ if (!lastUser) throw new Error("No user message found in stream. This should never happen.")
+ if (
+ lastAssistant?.finish &&
+ !["tool-calls"].includes(lastAssistant.finish) &&
+ lastUser.id < lastAssistant.id
+ ) {
+ log.info("exiting loop", { sessionID })
+ break
+ }
-### Phase 1: Initial Understanding
-Goal: Gain a comprehensive understanding of the user's request by reading through code and asking them questions. Critical: In this phase you should only use the explore subagent type.
+ step++
+ if (step === 1)
+ yield* title({
+ session,
+ modelID: lastUser.model.modelID,
+ providerID: lastUser.model.providerID,
+ history: msgs,
+ }).pipe(Effect.ignore, Effect.forkIn(scope))
+
+ const model = yield* getModel(lastUser.model.providerID, lastUser.model.modelID, sessionID)
+ const task = tasks.pop()
+
+ if (task?.type === "subtask") {
+ yield* handleSubtask({ task, model, lastUser, sessionID, session, msgs })
+ continue
+ }
-1. Focus on understanding the user's request and the code associated with their request
+ if (task?.type === "compaction") {
+ const result = yield* compaction.process({
+ messages: msgs,
+ parentID: lastUser.id,
+ sessionID,
+ auto: task.auto,
+ overflow: task.overflow,
+ })
+ if (result === "stop") break
+ continue
+ }
-2. **Launch up to 3 explore agents IN PARALLEL** (single message, multiple tool calls) to efficiently explore the codebase.
- - Use 1 agent when the task is isolated to known files, the user provided specific file paths, or you're making a small targeted change.
- - Use multiple agents when: the scope is uncertain, multiple areas of the codebase are involved, or you need to understand existing patterns before planning.
- - Quality over quantity - 3 agents maximum, but you should try to use the minimum number of agents necessary (usually just 1)
- - If using multiple agents: Provide each agent with a specific search focus or area to explore. Example: One agent searches for existing implementations, another explores related components, a third investigates testing patterns
+ if (
+ lastFinished &&
+ lastFinished.summary !== true &&
+ (yield* compaction.isOverflow({ tokens: lastFinished.tokens, model }))
+ ) {
+ yield* compaction.create({ sessionID, agent: lastUser.agent, model: lastUser.model, auto: true })
+ continue
+ }
-3. After exploring the code, use the question tool to clarify ambiguities in the user request up front.
+ const agent = yield* agents.get(lastUser.agent)
+ if (!agent) {
+ const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
+ const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
+ const error = new NamedError.Unknown({ message: `Agent not found: "${lastUser.agent}".${hint}` })
+ yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() })
+ throw error
+ }
+ const maxSteps = agent.steps ?? Infinity
+ const isLastStep = step >= maxSteps
+ msgs = yield* insertReminders({ messages: msgs, agent, session })
+
+ const msg: MessageV2.Assistant = {
+ id: MessageID.ascending(),
+ parentID: lastUser.id,
+ role: "assistant",
+ mode: agent.name,
+ agent: agent.name,
+ variant: lastUser.variant,
+ path: { cwd: Instance.directory, root: Instance.worktree },
+ cost: 0,
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: model.id,
+ providerID: model.providerID,
+ time: { created: Date.now() },
+ sessionID,
+ }
+ yield* sessions.updateMessage(msg)
+ const handle = yield* processor.create({
+ assistantMessage: msg,
+ sessionID,
+ model,
+ })
-### Phase 2: Design
-Goal: Design an implementation approach.
+ const outcome: "break" | "continue" = yield* Effect.onExit(
+ Effect.gen(function* () {
+ const lastUserMsg = msgs.findLast((m) => m.info.role === "user")
+ const bypassAgentCheck = lastUserMsg?.parts.some((p) => p.type === "agent") ?? false
+
+ const tools = yield* resolveTools({
+ agent,
+ session,
+ model,
+ tools: lastUser.tools,
+ processor: handle,
+ bypassAgentCheck,
+ messages: msgs,
+ })
+
+ if (lastUser.format?.type === "json_schema") {
+ tools["StructuredOutput"] = createStructuredOutputTool({
+ schema: lastUser.format.schema,
+ onSuccess(output) {
+ structured = output
+ },
+ })
+ }
-Launch general agent(s) to design the implementation based on the user's intent and your exploration results from Phase 1.
+ if (step === 1) SessionSummary.summarize({ sessionID, messageID: lastUser.id })
+
+ if (step > 1 && lastFinished) {
+ for (const m of msgs) {
+ if (m.info.role !== "user" || m.info.id <= lastFinished.id) continue
+ for (const p of m.parts) {
+ if (p.type !== "text" || p.ignored || p.synthetic) continue
+ if (!p.text.trim()) continue
+ p.text = [
+ "<system-reminder>",
+ "The user sent the following message:",
+ p.text,
+ "",
+ "Please address this message and continue with your tasks.",
+ "</system-reminder>",
+ ].join("\n")
+ }
+ }
+ }
-You can launch up to 1 agent(s) in parallel.
+ yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
+
+ const [skills, env, instructions, modelMsgs] = yield* Effect.promise(() =>
+ Promise.all([
+ SystemPrompt.skills(agent),
+ SystemPrompt.environment(model),
+ InstructionPrompt.system(),
+ MessageV2.toModelMessages(msgs, model),
+ ]),
+ )
+ const system = [...env, ...(skills ? [skills] : []), ...instructions]
+ const format = lastUser.format ?? { type: "text" as const }
+ if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
+ const result = yield* handle.process({
+ user: lastUser,
+ agent,
+ permission: session.permission,
+ sessionID,
+ system,
+ messages: [...modelMsgs, ...(isLastStep ? [{ role: "assistant" as const, content: MAX_STEPS }] : [])],
+ tools,
+ model,
+ toolChoice: format.type === "json_schema" ? "required" : undefined,
+ })
+
+ if (structured !== undefined) {
+ handle.message.structured = structured
+ handle.message.finish = handle.message.finish ?? "stop"
+ yield* sessions.updateMessage(handle.message)
+ return "break" as const
+ }
-**Guidelines:**
-- **Default**: Launch at least 1 Plan agent for most tasks - it helps validate your understanding and consider alternatives
-- **Skip agents**: Only for truly trivial tasks (typo fixes, single-line changes, simple renames)
+ const finished = handle.message.finish && !["tool-calls", "unknown"].includes(handle.message.finish)
+ if (finished && !handle.message.error) {
+ if (format.type === "json_schema") {
+ handle.message.error = new MessageV2.StructuredOutputError({
+ message: "Model did not produce structured output",
+ retries: 0,
+ }).toObject()
+ yield* sessions.updateMessage(handle.message)
+ return "break" as const
+ }
+ }
-Examples of when to use multiple agents:
-- The task touches multiple parts of the codebase
-- It's a large refactor or architectural change
-- There are many edge cases to consider
-- You'd benefit from exploring different approaches
+ if (result === "stop") return "break" as const
+ if (result === "compact") {
+ yield* compaction.create({
+ sessionID,
+ agent: lastUser.agent,
+ model: lastUser.model,
+ auto: true,
+ overflow: !handle.message.finish,
+ })
+ }
+ return "continue" as const
+ }),
+ Effect.fnUntraced(function* (exit) {
+ if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) yield* handle.abort()
+ InstructionPrompt.clear(handle.message.id)
+ }),
+ )
+ if (outcome === "break") break
+ continue
+ }
-Example perspectives by task type:
-- New feature: simplicity vs performance vs maintainability
-- Bug fix: root cause vs workaround vs prevention
-- Refactoring: minimal change vs clean architecture
+ yield* compaction.prune({ sessionID }).pipe(Effect.ignore, Effect.forkIn(scope))
+ return yield* lastAssistant(sessionID)
+ },
+ )
-In the agent prompt:
-- Provide comprehensive background context from Phase 1 exploration including filenames and code path traces
-- Describe requirements and constraints
-- Request a detailed implementation plan
+ const loop: (input: z.infer<typeof LoopInput>) => Effect.Effect<MessageV2.WithParts> = Effect.fn(
+ "SessionPrompt.loop",
+ )(function* (input: z.infer<typeof LoopInput>) {
+ const s = yield* InstanceState.get(cache)
+ const runner = getRunner(s.runners, input.sessionID)
+ return yield* runner.ensureRunning(runLoop(input.sessionID))
+ })
-### Phase 3: Review
-Goal: Review the plan(s) from Phase 2 and ensure alignment with the user's intentions.
-1. Read the critical files identified by agents to deepen your understanding
-2. Ensure that the plans align with the user's original request
-3. Use question tool to clarify any remaining questions with the user
+ const shell: (input: ShellInput) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.shell")(
+ function* (input: ShellInput) {
+ const s = yield* InstanceState.get(cache)
+ const runner = getRunner(s.runners, input.sessionID)
+ return yield* runner.startShell((signal) => shellImpl(input, signal))
+ },
+ )
-### Phase 4: Final Plan
-Goal: Write your final plan to the plan file (the only file you can edit).
-- Include only your recommended approach, not all alternatives
-- Ensure that the plan file is concise enough to scan quickly, but detailed enough to execute effectively
-- Include the paths of critical files to be modified
-- Include a verification section describing how to test the changes end-to-end (run the code, use MCP tools, run tests)
+ const command = Effect.fn("SessionPrompt.command")(function* (input: CommandInput) {
+ log.info("command", input)
+ const cmd = yield* commands.get(input.command)
+ if (!cmd) {
+ const available = (yield* commands.list()).map((c) => c.name)
+ const hint = available.length ? ` Available commands: ${available.join(", ")}` : ""
+ const error = new NamedError.Unknown({ message: `Command not found: "${input.command}".${hint}` })
+ yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() })
+ throw error
+ }
+ const agentName = cmd.agent ?? input.agent ?? (yield* agents.defaultAgent())
-### Phase 5: Call plan_exit tool
-At the very end of your turn, once you have asked the user questions and are happy with your final plan file - you should always call plan_exit to indicate to the user that you are done planning.
-This is critical - your turn should only end with either asking the user a question or calling plan_exit. Do not stop unless it's for these 2 reasons.
+ const raw = input.arguments.match(argsRegex) ?? []
+ const args = raw.map((arg) => arg.replace(quoteTrimRegex, ""))
+ const templateCommand = yield* Effect.promise(async () => cmd.template)
-**Important:** Use question tool to clarify requirements/approach, use plan_exit to request plan approval. Do NOT use question tool to ask "Is this plan okay?" - that's what plan_exit does.
+ const placeholders = templateCommand.match(placeholderRegex) ?? []
+ let last = 0
+ for (const item of placeholders) {
+ const value = Number(item.slice(1))
+ if (value > last) last = value
+ }
-NOTE: At any point in time through this workflow you should feel free to ask the user questions or clarifications. Don't make large assumptions about user intent. The goal is to present a well researched plan to the user, and tie any loose ends before implementation begins.
-</system-reminder>`,
- synthetic: true,
+ const withArgs = templateCommand.replaceAll(placeholderRegex, (_, index) => {
+ const position = Number(index)
+ const argIndex = position - 1
+ if (argIndex >= args.length) return ""
+ if (position === last) return args.slice(argIndex).join(" ")
+ return args[argIndex]
+ })
+ const usesArgumentsPlaceholder = templateCommand.includes("$ARGUMENTS")
+ let template = withArgs.replaceAll("$ARGUMENTS", input.arguments)
+
+ if (placeholders.length === 0 && !usesArgumentsPlaceholder && input.arguments.trim()) {
+ template = template + "\n\n" + input.arguments
+ }
+
+ const shellMatches = ConfigMarkdown.shell(template)
+ if (shellMatches.length > 0) {
+ const sh = Shell.preferred()
+ const results = yield* Effect.promise(() =>
+ Promise.all(
+ shellMatches.map(async ([, cmd]) => (await Process.text([cmd], { shell: sh, nothrow: true })).text),
+ ),
+ )
+ let index = 0
+ template = template.replace(bashRegex, () => results[index++])
+ }
+ template = template.trim()
+
+ const taskModel = yield* Effect.gen(function* () {
+ if (cmd.model) return Provider.parseModel(cmd.model)
+ if (cmd.agent) {
+ const cmdAgent = yield* agents.get(cmd.agent)
+ if (cmdAgent?.model) return cmdAgent.model
+ }
+ if (input.model) return Provider.parseModel(input.model)
+ return yield* lastModel(input.sessionID)
+ })
+
+ yield* getModel(taskModel.providerID, taskModel.modelID, input.sessionID)
+
+ const agent = yield* agents.get(agentName)
+ if (!agent) {
+ const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
+ const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
+ const error = new NamedError.Unknown({ message: `Agent not found: "${agentName}".${hint}` })
+ yield* bus.publish(Session.Event.Error, { sessionID: input.sessionID, error: error.toObject() })
+ throw error
+ }
+
+ const templateParts = yield* resolvePromptParts(template)
+ const isSubtask = (agent.mode === "subagent" && cmd.subtask !== false) || cmd.subtask === true
+ const parts = isSubtask
+ ? [
+ {
+ type: "subtask" as const,
+ agent: agent.name,
+ description: cmd.description ?? "",
+ command: input.command,
+ model: { providerID: taskModel.providerID, modelID: taskModel.modelID },
+ prompt: templateParts.find((y) => y.type === "text")?.text ?? "",
+ },
+ ]
+ : [...templateParts, ...(input.parts ?? [])]
+
+ const userAgent = isSubtask ? (input.agent ?? (yield* agents.defaultAgent())) : agentName
+ const userModel = isSubtask
+ ? input.model
+ ? Provider.parseModel(input.model)
+ : yield* lastModel(input.sessionID)
+ : taskModel
+
+ yield* plugin.trigger(
+ "command.execute.before",
+ { command: input.command, sessionID: input.sessionID, arguments: input.arguments },
+ { parts },
+ )
+
+ const result = yield* prompt({
+ sessionID: input.sessionID,
+ messageID: input.messageID,
+ model: userModel,
+ agent: userAgent,
+ parts,
+ variant: input.variant,
+ })
+ yield* bus.publish(Command.Event.Executed, {
+ name: input.command,
+ sessionID: input.sessionID,
+ arguments: input.arguments,
+ messageID: result.info.id,
+ })
+ return result
+ })
+
+ return Service.of({
+ assertNotBusy,
+ cancel,
+ prompt,
+ loop,
+ shell,
+ command,
+ resolvePromptParts,
})
- userMessage.parts.push(part)
- return input.messages
- }
- return input.messages
+ }),
+ )
+
+ const defaultLayer = Layer.unwrap(
+ Effect.sync(() =>
+ layer.pipe(
+ Layer.provide(SessionStatus.layer),
+ Layer.provide(SessionCompaction.defaultLayer),
+ Layer.provide(SessionProcessor.defaultLayer),
+ Layer.provide(Command.defaultLayer),
+ Layer.provide(Permission.layer),
+ Layer.provide(MCP.defaultLayer),
+ Layer.provide(LSP.defaultLayer),
+ Layer.provide(FileTime.layer),
+ Layer.provide(ToolRegistry.defaultLayer),
+ Layer.provide(Truncate.layer),
+ Layer.provide(AppFileSystem.defaultLayer),
+ Layer.provide(Plugin.defaultLayer),
+ Layer.provide(Session.defaultLayer),
+ Layer.provide(Agent.defaultLayer),
+ Layer.provide(Bus.layer),
+ ),
+ ),
+ )
+ const { runPromise } = makeRuntime(Service, defaultLayer)
+
+ export async function assertNotBusy(sessionID: SessionID) {
+ return runPromise((svc) => svc.assertNotBusy(SessionID.zod.parse(sessionID)))
}
- export const ShellInput = z.object({
+ export const PromptInput = z.object({
sessionID: SessionID.zod,
- agent: z.string(),
+ messageID: MessageID.zod.optional(),
model: z
.object({
providerID: ProviderID.zod,
modelID: ModelID.zod,
})
.optional(),
- command: z.string(),
- })
- export type ShellInput = z.infer<typeof ShellInput>
- export async function shell(input: ShellInput) {
- const abort = start(input.sessionID)
- if (!abort) {
- throw new Session.BusyError(input.sessionID)
- }
-
- using _ = defer(() => {
- // If no queued callbacks, cancel (the default)
- const callbacks = state()[input.sessionID]?.callbacks ?? []
- if (callbacks.length === 0) {
- cancel(input.sessionID)
- } else {
- // Otherwise, trigger the session loop to process queued items
- loop({ sessionID: input.sessionID, resume_existing: true }).catch((error) => {
- log.error("session loop failed to resume after shell command", { sessionID: input.sessionID, error })
+ agent: z.string().optional(),
+ noReply: z.boolean().optional(),
+ tools: z
+ .record(z.string(), z.boolean())
+ .optional()
+ .describe(
+ "@deprecated tools and permissions have been merged, you can set permissions on the session itself now",
+ ),
+ format: MessageV2.Format.optional(),
+ system: z.string().optional(),
+ variant: z.string().optional(),
+ parts: z.array(
+ z.discriminatedUnion("type", [
+ MessageV2.TextPart.omit({
+ messageID: true,
+ sessionID: true,
})
- }
- })
-
- const session = await Session.get(input.sessionID)
- if (session.revert) {
- await SessionRevert.cleanup(session)
- }
- const agent = await Agent.get(input.agent)
- if (!agent) {
- const available = await Agent.list().then((agents) => agents.filter((a) => !a.hidden).map((a) => a.name))
- const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
- const error = new NamedError.Unknown({ message: `Agent not found: "${input.agent}".${hint}` })
- Bus.publish(Session.Event.Error, {
- sessionID: input.sessionID,
- error: error.toObject(),
- })
- throw error
- }
- const model = input.model ?? agent.model ?? (await lastModel(input.sessionID))
- const userMsg: MessageV2.User = {
- id: MessageID.ascending(),
- sessionID: input.sessionID,
- time: {
- created: Date.now(),
- },
- role: "user",
- agent: input.agent,
- model: {
- providerID: model.providerID,
- modelID: model.modelID,
- },
- }
- await Session.updateMessage(userMsg)
- const userPart: MessageV2.Part = {
- type: "text",
- id: PartID.ascending(),
- messageID: userMsg.id,
- sessionID: input.sessionID,
- text: "The following tool was executed by the user",
- synthetic: true,
- }
- await Session.updatePart(userPart)
-
- const msg: MessageV2.Assistant = {
- id: MessageID.ascending(),
- sessionID: input.sessionID,
- parentID: userMsg.id,
- mode: input.agent,
- agent: input.agent,
- cost: 0,
- path: {
- cwd: Instance.directory,
- root: Instance.worktree,
- },
- time: {
- created: Date.now(),
- },
- role: "assistant",
- tokens: {
- input: 0,
- output: 0,
- reasoning: 0,
- cache: { read: 0, write: 0 },
- },
- modelID: model.modelID,
- providerID: model.providerID,
- }
- await Session.updateMessage(msg)
- const part: MessageV2.Part = {
- type: "tool",
- id: PartID.ascending(),
- messageID: msg.id,
- sessionID: input.sessionID,
- tool: "bash",
- callID: ulid(),
- state: {
- status: "running",
- time: {
- start: Date.now(),
- },
- input: {
- command: input.command,
- },
- },
- }
- await Session.updatePart(part)
- const shell = Shell.preferred()
- const shellName = Shell.name(shell)
-
- const invocations: Record<string, { args: string[] }> = {
- nu: {
- args: ["-c", input.command],
- },
- fish: {
- args: ["-c", input.command],
- },
- zsh: {
- args: [
- "-c",
- "-l",
- `
- [[ -f ~/.zshenv ]] && source ~/.zshenv >/dev/null 2>&1 || true
- [[ -f "\${ZDOTDIR:-$HOME}/.zshrc" ]] && source "\${ZDOTDIR:-$HOME}/.zshrc" >/dev/null 2>&1 || true
- eval ${JSON.stringify(input.command)}
- `,
- ],
- },
- bash: {
- args: [
- "-c",
- "-l",
- `
- shopt -s expand_aliases
- [[ -f ~/.bashrc ]] && source ~/.bashrc >/dev/null 2>&1 || true
- eval ${JSON.stringify(input.command)}
- `,
- ],
- },
- // Windows cmd
- cmd: {
- args: ["/c", input.command],
- },
- // Windows PowerShell
- powershell: {
- args: ["-NoProfile", "-Command", input.command],
- },
- pwsh: {
- args: ["-NoProfile", "-Command", input.command],
- },
- // Fallback: any shell that doesn't match those above
- // - No -l, for max compatibility
- "": {
- args: ["-c", `${input.command}`],
- },
- }
-
- const matchingInvocation = invocations[shellName] ?? invocations[""]
- const args = matchingInvocation?.args
-
- const cwd = Instance.directory
- const shellEnv = await Plugin.trigger(
- "shell.env",
- { cwd, sessionID: input.sessionID, callID: part.callID },
- { env: {} },
- )
- const proc = spawn(shell, args, {
- cwd,
- detached: process.platform !== "win32",
- windowsHide: process.platform === "win32",
- stdio: ["ignore", "pipe", "pipe"],
- env: {
- ...process.env,
- ...shellEnv.env,
- TERM: "dumb",
- },
- })
-
- let output = ""
-
- proc.stdout?.on("data", (chunk) => {
- output += chunk.toString()
- if (part.state.status === "running") {
- part.state.metadata = {
- output: output,
- description: "",
- }
- Session.updatePart(part)
- }
- })
-
- proc.stderr?.on("data", (chunk) => {
- output += chunk.toString()
- if (part.state.status === "running") {
- part.state.metadata = {
- output: output,
- description: "",
- }
- Session.updatePart(part)
- }
- })
+ .partial({
+ id: true,
+ })
+ .meta({
+ ref: "TextPartInput",
+ }),
+ MessageV2.FilePart.omit({
+ messageID: true,
+ sessionID: true,
+ })
+ .partial({
+ id: true,
+ })
+ .meta({
+ ref: "FilePartInput",
+ }),
+ MessageV2.AgentPart.omit({
+ messageID: true,
+ sessionID: true,
+ })
+ .partial({
+ id: true,
+ })
+ .meta({
+ ref: "AgentPartInput",
+ }),
+ MessageV2.SubtaskPart.omit({
+ messageID: true,
+ sessionID: true,
+ })
+ .partial({
+ id: true,
+ })
+ .meta({
+ ref: "SubtaskPartInput",
+ }),
+ ]),
+ ),
+ })
+ export type PromptInput = z.infer<typeof PromptInput>
- let aborted = false
- let exited = false
+ export async function prompt(input: PromptInput) {
+ return runPromise((svc) => svc.prompt(PromptInput.parse(input)))
+ }
- const kill = () => Shell.killTree(proc, { exited: () => exited })
+ export async function resolvePromptParts(template: string) {
+ return runPromise((svc) => svc.resolvePromptParts(z.string().parse(template)))
+ }
- if (abort.aborted) {
- aborted = true
- await kill()
- }
+ export async function cancel(sessionID: SessionID) {
+ return runPromise((svc) => svc.cancel(SessionID.zod.parse(sessionID)))
+ }
- const abortHandler = () => {
- aborted = true
- void kill()
- }
+ export const LoopInput = z.object({
+ sessionID: SessionID.zod,
+ })
- abort.addEventListener("abort", abortHandler, { once: true })
+ export async function loop(input: z.infer<typeof LoopInput>) {
+ return runPromise((svc) => svc.loop(LoopInput.parse(input)))
+ }
- await new Promise<void>((resolve) => {
- proc.on("close", () => {
- exited = true
- abort.removeEventListener("abort", abortHandler)
- resolve()
+ export const ShellInput = z.object({
+ sessionID: SessionID.zod,
+ agent: z.string(),
+ model: z
+ .object({
+ providerID: ProviderID.zod,
+ modelID: ModelID.zod,
})
- })
+ .optional(),
+ command: z.string(),
+ })
+ export type ShellInput = z.infer<typeof ShellInput>
- if (aborted) {
- output += "\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n")
- }
- msg.time.completed = Date.now()
- await Session.updateMessage(msg)
- if (part.state.status === "running") {
- part.state = {
- status: "completed",
- time: {
- ...part.state.time,
- end: Date.now(),
- },
- input: part.state.input,
- title: "",
- metadata: {
- output,
- description: "",
- },
- output,
- }
- await Session.updatePart(part)
- }
- return { info: msg, parts: [part] }
+ export async function shell(input: ShellInput) {
+ return runPromise((svc) => svc.shell(ShellInput.parse(input)))
}
export const CommandInput = z.object({
@@ -1814,243 +1847,52 @@ NOTE: At any point in time through this workflow you should feel free to ask the
.optional(),
})
export type CommandInput = z.infer<typeof CommandInput>
- const bashRegex = /!`([^`]+)`/g
- // Match [Image N] as single token, quoted strings, or non-space sequences
- const argsRegex = /(?:\[Image\s+\d+\]|"[^"]*"|'[^']*'|[^\s"']+)/gi
- const placeholderRegex = /\$(\d+)/g
- const quoteTrimRegex = /^["']|["']$/g
- /**
- * Regular expression to match @ file references in text
- * Matches @ followed by file paths, excluding commas, periods at end of sentences, and backticks
- * Does not match when preceded by word characters or backticks (to avoid email addresses and quoted references)
- */
export async function command(input: CommandInput) {
- log.info("command", input)
- const command = await Command.get(input.command)
- if (!command) {
- const available = await Command.list().then((cmds) => cmds.map((c) => c.name))
- const hint = available.length ? ` Available commands: ${available.join(", ")}` : ""
- const error = new NamedError.Unknown({ message: `Command not found: "${input.command}".${hint}` })
- Bus.publish(Session.Event.Error, {
- sessionID: input.sessionID,
- error: error.toObject(),
- })
- throw error
- }
- const agentName = command.agent ?? input.agent ?? (await Agent.defaultAgent())
-
- const raw = input.arguments.match(argsRegex) ?? []
- const args = raw.map((arg) => arg.replace(quoteTrimRegex, ""))
-
- const templateCommand = await command.template
-
- const placeholders = templateCommand.match(placeholderRegex) ?? []
- let last = 0
- for (const item of placeholders) {
- const value = Number(item.slice(1))
- if (value > last) last = value
- }
-
- // Let the final placeholder swallow any extra arguments so prompts read naturally
- const withArgs = templateCommand.replaceAll(placeholderRegex, (_, index) => {
- const position = Number(index)
- const argIndex = position - 1
- if (argIndex >= args.length) return ""
- if (position === last) return args.slice(argIndex).join(" ")
- return args[argIndex]
- })
- const usesArgumentsPlaceholder = templateCommand.includes("$ARGUMENTS")
- let template = withArgs.replaceAll("$ARGUMENTS", input.arguments)
-
- // If command doesn't explicitly handle arguments (no $N or $ARGUMENTS placeholders)
- // but user provided arguments, append them to the template
- if (placeholders.length === 0 && !usesArgumentsPlaceholder && input.arguments.trim()) {
- template = template + "\n\n" + input.arguments
- }
-
- const shellMatches = ConfigMarkdown.shell(template)
- if (shellMatches.length > 0) {
- const sh = Shell.preferred()
- const results = await Promise.all(
- shellMatches.map(async ([, cmd]) => {
- const out = await Process.text([cmd], { shell: sh, nothrow: true })
- return out.text
- }),
- )
- let index = 0
- template = template.replace(bashRegex, () => results[index++])
- }
- template = template.trim()
-
- const taskModel = await (async () => {
- if (command.model) {
- return Provider.parseModel(command.model)
- }
- if (command.agent) {
- const cmdAgent = await Agent.get(command.agent)
- if (cmdAgent?.model) {
- return cmdAgent.model
- }
- }
- if (input.model) return Provider.parseModel(input.model)
- return await lastModel(input.sessionID)
- })()
-
- try {
- await Provider.getModel(taskModel.providerID, taskModel.modelID)
- } catch (e) {
- if (Provider.ModelNotFoundError.isInstance(e)) {
- const { providerID, modelID, suggestions } = e.data
- const hint = suggestions?.length ? ` Did you mean: ${suggestions.join(", ")}?` : ""
- Bus.publish(Session.Event.Error, {
- sessionID: input.sessionID,
- error: new NamedError.Unknown({ message: `Model not found: ${providerID}/${modelID}.${hint}` }).toObject(),
- })
- }
- throw e
- }
- const agent = await Agent.get(agentName)
- if (!agent) {
- const available = await Agent.list().then((agents) => agents.filter((a) => !a.hidden).map((a) => a.name))
- const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
- const error = new NamedError.Unknown({ message: `Agent not found: "${agentName}".${hint}` })
- Bus.publish(Session.Event.Error, {
- sessionID: input.sessionID,
- error: error.toObject(),
- })
- throw error
- }
+ return runPromise((svc) => svc.command(CommandInput.parse(input)))
+ }
- const templateParts = await resolvePromptParts(template)
- const isSubtask = (agent.mode === "subagent" && command.subtask !== false) || command.subtask === true
- const parts = isSubtask
- ? [
- {
- type: "subtask" as const,
- agent: agent.name,
- description: command.description ?? "",
- command: input.command,
- model: {
- providerID: taskModel.providerID,
- modelID: taskModel.modelID,
- },
- // TODO: how can we make task tool accept a more complex input?
- prompt: templateParts.find((y) => y.type === "text")?.text ?? "",
- },
- ]
- : [...templateParts, ...(input.parts ?? [])]
-
- const userAgent = isSubtask ? (input.agent ?? (await Agent.defaultAgent())) : agentName
- const userModel = isSubtask
- ? input.model
- ? Provider.parseModel(input.model)
- : await lastModel(input.sessionID)
- : taskModel
-
- await Plugin.trigger(
- "command.execute.before",
- {
- command: input.command,
- sessionID: input.sessionID,
- arguments: input.arguments,
- },
- { parts },
- )
-
- const result = (await prompt({
- sessionID: input.sessionID,
- messageID: input.messageID,
- model: userModel,
- agent: userAgent,
- parts,
- variant: input.variant,
- })) as MessageV2.WithParts
-
- Bus.publish(Command.Event.Executed, {
- name: input.command,
- sessionID: input.sessionID,
- arguments: input.arguments,
- messageID: result.info.id,
+ const lastModel = Effect.fnUntraced(function* (sessionID: SessionID) {
+ return yield* Effect.promise(async () => {
+ for await (const item of MessageV2.stream(sessionID)) {
+ if (item.info.role === "user" && item.info.model) return item.info.model
+ }
+ return Provider.defaultModel()
})
+ })
- return result
- }
+ /** @internal Exported for testing */
+ export function createStructuredOutputTool(input: {
+ schema: Record<string, any>
+ onSuccess: (output: unknown) => void
+ }): AITool {
+ // Remove $schema property if present (not needed for tool input)
+ const { $schema, ...toolSchema } = input.schema
- async function ensureTitle(input: {
- session: Session.Info
- history: MessageV2.WithParts[]
- providerID: ProviderID
- modelID: ModelID
- }) {
- if (input.session.parentID) return
- if (!Session.isDefaultTitle(input.session.title)) return
-
- // Find first non-synthetic user message
- const firstRealUserIdx = input.history.findIndex(
- (m) => m.info.role === "user" && !m.parts.every((p) => "synthetic" in p && p.synthetic),
- )
- if (firstRealUserIdx === -1) return
-
- const isFirst =
- input.history.filter((m) => m.info.role === "user" && !m.parts.every((p) => "synthetic" in p && p.synthetic))
- .length === 1
- if (!isFirst) return
-
- // Gather all messages up to and including the first real user message for context
- // This includes any shell/subtask executions that preceded the user's first prompt
- const contextMessages = input.history.slice(0, firstRealUserIdx + 1)
- const firstRealUser = contextMessages[firstRealUserIdx]
-
- // For subtask-only messages (from command invocations), extract the prompt directly
- // since toModelMessage converts subtask parts to generic "The following tool was executed by the user"
- const subtaskParts = firstRealUser.parts.filter((p) => p.type === "subtask") as MessageV2.SubtaskPart[]
- const hasOnlySubtaskParts = subtaskParts.length > 0 && firstRealUser.parts.every((p) => p.type === "subtask")
-
- const agent = await Agent.get("title")
- if (!agent) return
- const model = await iife(async () => {
- if (agent.model) return await Provider.getModel(agent.model.providerID, agent.model.modelID)
- return (
- (await Provider.getSmallModel(input.providerID)) ?? (await Provider.getModel(input.providerID, input.modelID))
- )
+ return tool({
+ id: "StructuredOutput" as any,
+ description: STRUCTURED_OUTPUT_DESCRIPTION,
+ inputSchema: jsonSchema(toolSchema as any),
+ async execute(args) {
+ // AI SDK validates args against inputSchema before calling execute()
+ input.onSuccess(args)
+ return {
+ output: "Structured output captured successfully.",
+ title: "Structured Output",
+ metadata: { valid: true },
+ }
+ },
+ toModelOutput({ output }) {
+ return {
+ type: "text",
+ value: output.output,
+ }
+ },
})
- try {
- const result = await LLM.stream({
- agent,
- user: firstRealUser.info as MessageV2.User,
- system: [],
- small: true,
- tools: {},
- model,
- abort: new AbortController().signal,
- sessionID: input.session.id,
- retries: 2,
- messages: [
- {
- role: "user",
- content: "Generate a title for this conversation:\n",
- },
- ...(hasOnlySubtaskParts
- ? [{ role: "user" as const, content: subtaskParts.map((p) => p.prompt).join("\n") }]
- : await MessageV2.toModelMessages(contextMessages, model)),
- ],
- })
- const text = await result.text
- const cleaned = text
- .replace(/<think>[\s\S]*?<\/think>\s*/g, "")
- .split("\n")
- .map((line) => line.trim())
- .find((line) => line.length > 0)
- if (!cleaned) return
-
- const title = cleaned.length > 100 ? cleaned.substring(0, 97) + "..." : cleaned
- return Session.setTitle({ sessionID: input.session.id, title }).catch((err) => {
- if (NotFoundError.isInstance(err)) return
- throw err
- })
- } catch (error) {
- log.error("failed to generate title", { error })
- }
}
+ const bashRegex = /!`([^`]+)`/g
+ // Match [Image N] as single token, quoted strings, or non-space sequences
+ const argsRegex = /(?:\[Image\s+\d+\]|"[^"]*"|'[^']*'|[^\s"']+)/gi
+ const placeholderRegex = /\$(\d+)/g
+ const quoteTrimRegex = /^["']|["']$/g
}
diff --git a/packages/opencode/src/session/revert.ts b/packages/opencode/src/session/revert.ts
index 6df8b3d53..a80ee4520 100644
--- a/packages/opencode/src/session/revert.ts
+++ b/packages/opencode/src/session/revert.ts
@@ -21,7 +21,7 @@ export namespace SessionRevert {
export type RevertInput = z.infer<typeof RevertInput>
export async function revert(input: RevertInput) {
- SessionPrompt.assertNotBusy(input.sessionID)
+ await SessionPrompt.assertNotBusy(input.sessionID)
const all = await Session.messages({ sessionID: input.sessionID })
let lastUser: MessageV2.User | undefined
const session = await Session.get(input.sessionID)
@@ -80,7 +80,7 @@ export namespace SessionRevert {
export async function unrevert(input: { sessionID: SessionID }) {
log.info("unreverting", input)
- SessionPrompt.assertNotBusy(input.sessionID)
+ await SessionPrompt.assertNotBusy(input.sessionID)
const session = await Session.get(input.sessionID)
if (!session.revert) return session
if (session.revert.snapshot) await Snapshot.restore(session.revert.snapshot)
diff --git a/packages/opencode/src/session/summary.ts b/packages/opencode/src/session/summary.ts
index 898b93f3f..c65cb9d0e 100644
--- a/packages/opencode/src/session/summary.ts
+++ b/packages/opencode/src/session/summary.ts
@@ -3,7 +3,6 @@ import z from "zod"
import { Session } from "."
import { MessageV2 } from "./message-v2"
-import { Identifier } from "@/id/id"
import { SessionID, MessageID } from "./schema"
import { Snapshot } from "@/snapshot"
@@ -110,8 +109,8 @@ export namespace SessionSummary {
(m) => m.info.id === input.messageID || (m.info.role === "assistant" && m.info.parentID === input.messageID),
)
const msgWithParts = messages.find((m) => m.info.id === input.messageID)
- if (!msgWithParts) return
- const userMsg = msgWithParts.info as MessageV2.User
+ if (!msgWithParts || msgWithParts.info.role !== "user") return
+ const userMsg = msgWithParts.info
const diffs = await computeDiff({ messages })
userMsg.summary = {
...userMsg.summary,
diff --git a/packages/opencode/src/tool/registry.ts b/packages/opencode/src/tool/registry.ts
index eeb733480..a8349e2c1 100644
--- a/packages/opencode/src/tool/registry.ts
+++ b/packages/opencode/src/tool/registry.ts
@@ -46,12 +46,12 @@ export namespace ToolRegistry {
readonly tools: (
model: { providerID: ProviderID; modelID: ModelID },
agent?: Agent.Info,
- ) => Effect.Effect<(Awaited<ReturnType<Tool.Info["init"]>> & { id: string })[]>
+ ) => Effect.Effect<(Tool.Def & { id: string })[]>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/ToolRegistry") {}
- export const layer = Layer.effect(
+ export const layer: Layer.Layer<Service, never, Config.Service | Plugin.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const config = yield* Config.Service
@@ -174,7 +174,7 @@ export namespace ToolRegistry {
})
return yield* Effect.forEach(
filtered,
- Effect.fnUntraced(function* (tool) {
+ Effect.fnUntraced(function* (tool: Tool.Info) {
using _ = log.time(tool.id)
const next = yield* Effect.promise(() => tool.init({ agent }))
const output = {
@@ -184,10 +184,11 @@ export namespace ToolRegistry {
yield* plugin.trigger("tool.definition", { toolID: tool.id }, output)
return {
id: tool.id,
- ...next,
description: output.description,
parameters: output.parameters,
- } as Awaited<ReturnType<Tool.Info["init"]>> & { id: string }
+ execute: next.execute,
+ formatValidationError: next.formatValidationError,
+ }
}),
{ concurrency: "unbounded" },
)
@@ -217,7 +218,7 @@ export namespace ToolRegistry {
modelID: ModelID
},
agent?: Agent.Info,
- ): Promise<(Awaited<ReturnType<Tool.Info["init"]>> & { id: string })[]> {
+ ): Promise<(Tool.Def & { id: string })[]> {
return runPromise((svc) => svc.tools(model, agent))
}
}
diff --git a/packages/opencode/src/tool/tool.ts b/packages/opencode/src/tool/tool.ts
index 6c3f4efaf..98fa50f8c 100644
--- a/packages/opencode/src/tool/tool.ts
+++ b/packages/opencode/src/tool/tool.ts
@@ -25,22 +25,24 @@ export namespace Tool {
metadata(input: { title?: string; metadata?: M }): void
ask(input: Omit<Permission.Request, "id" | "sessionID" | "tool">): Promise<void>
}
+ export interface Def<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> {
+ description: string
+ parameters: Parameters
+ execute(
+ args: z.infer<Parameters>,
+ ctx: Context,
+ ): Promise<{
+ title: string
+ metadata: M
+ output: string
+ attachments?: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[]
+ }>
+ formatValidationError?(error: z.ZodError): string
+ }
+
export interface Info<Parameters extends z.ZodType = z.ZodType, M extends Metadata = Metadata> {
id: string
- init: (ctx?: InitContext) => Promise<{
- description: string
- parameters: Parameters
- execute(
- args: z.infer<Parameters>,
- ctx: Context,
- ): Promise<{
- title: string
- metadata: M
- output: string
- attachments?: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[]
- }>
- formatValidationError?(error: z.ZodError): string
- }>
+ init: (ctx?: InitContext) => Promise<Def<Parameters, M>>
}
export type InferParameters<T extends Info> = T extends Info<infer P> ? z.infer<P> : never
@@ -48,7 +50,7 @@ export namespace Tool {
export function define<Parameters extends z.ZodType, Result extends Metadata>(
id: string,
- init: Info<Parameters, Result>["init"] | Awaited<ReturnType<Info<Parameters, Result>["init"]>>,
+ init: Info<Parameters, Result>["init"] | Def<Parameters, Result>,
): Info<Parameters, Result> {
return {
id,
diff --git a/packages/opencode/test/effect/runner.test.ts b/packages/opencode/test/effect/runner.test.ts
new file mode 100644
index 000000000..5d3488849
--- /dev/null
+++ b/packages/opencode/test/effect/runner.test.ts
@@ -0,0 +1,523 @@
+import { describe, expect, test } from "bun:test"
+import { Deferred, Effect, Exit, Fiber, Ref, Scope } from "effect"
+import { Runner } from "../../src/effect/runner"
+import { it } from "../lib/effect"
+
+describe("Runner", () => {
+ // --- ensureRunning semantics ---
+
+ it.effect(
+ "ensureRunning starts work and returns result",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const result = yield* runner.ensureRunning(Effect.succeed("hello"))
+ expect(result).toBe("hello")
+ expect(runner.state._tag).toBe("Idle")
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "ensureRunning propagates work failures",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string, string>(s)
+ const exit = yield* runner.ensureRunning(Effect.fail("boom")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ expect(runner.state._tag).toBe("Idle")
+ }),
+ )
+
+ it.effect(
+ "concurrent callers share the same run",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const calls = yield* Ref.make(0)
+ const work = Effect.gen(function* () {
+ yield* Ref.update(calls, (n) => n + 1)
+ yield* Effect.sleep("10 millis")
+ return "shared"
+ })
+
+ const [a, b] = yield* Effect.all([runner.ensureRunning(work), runner.ensureRunning(work)], {
+ concurrency: "unbounded",
+ })
+
+ expect(a).toBe("shared")
+ expect(b).toBe("shared")
+ expect(yield* Ref.get(calls)).toBe(1)
+ }),
+ )
+
+ it.effect(
+ "concurrent callers all receive same error",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string, string>(s)
+ const work = Effect.gen(function* () {
+ yield* Effect.sleep("10 millis")
+ return yield* Effect.fail("boom")
+ })
+
+ const [a, b] = yield* Effect.all(
+ [runner.ensureRunning(work).pipe(Effect.exit), runner.ensureRunning(work).pipe(Effect.exit)],
+ { concurrency: "unbounded" },
+ )
+
+ expect(Exit.isFailure(a)).toBe(true)
+ expect(Exit.isFailure(b)).toBe(true)
+ }),
+ )
+
+ it.effect(
+ "ensureRunning can be called again after previous run completes",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ expect(yield* runner.ensureRunning(Effect.succeed("first"))).toBe("first")
+ expect(yield* runner.ensureRunning(Effect.succeed("second"))).toBe("second")
+ }),
+ )
+
+ it.effect(
+ "second ensureRunning ignores new work if already running",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const ran = yield* Ref.make<string[]>([])
+
+ const first = Effect.gen(function* () {
+ yield* Ref.update(ran, (a) => [...a, "first"])
+ yield* Effect.sleep("50 millis")
+ return "first-result"
+ })
+ const second = Effect.gen(function* () {
+ yield* Ref.update(ran, (a) => [...a, "second"])
+ return "second-result"
+ })
+
+ const [a, b] = yield* Effect.all([runner.ensureRunning(first), runner.ensureRunning(second)], {
+ concurrency: "unbounded",
+ })
+
+ expect(a).toBe("first-result")
+ expect(b).toBe("first-result")
+ expect(yield* Ref.get(ran)).toEqual(["first"])
+ }),
+ )
+
+ // --- cancel semantics ---
+
+ it.effect(
+ "cancel interrupts running work",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.busy).toBe(true)
+ expect(runner.state._tag).toBe("Running")
+
+ yield* runner.cancel
+ expect(runner.busy).toBe(false)
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isFailure(exit)).toBe(true)
+ }),
+ )
+
+ it.effect(
+ "cancel on idle is a no-op",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ yield* runner.cancel
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "cancel with onInterrupt resolves callers gracefully",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("never"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ yield* runner.cancel
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) expect(exit.value).toBe("fallback")
+ }),
+ )
+
+ it.effect(
+ "cancel with queued callers resolves all",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s, { onInterrupt: Effect.succeed("fallback") })
+
+ const a = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ const b = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ yield* runner.cancel
+
+ const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(exitA)).toBe(true)
+ expect(Exit.isSuccess(exitB)).toBe(true)
+ if (Exit.isSuccess(exitA)) expect(exitA.value).toBe("fallback")
+ if (Exit.isSuccess(exitB)) expect(exitB.value).toBe("fallback")
+ }),
+ )
+
+ it.effect(
+ "work can be started after cancel",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ yield* runner.cancel
+ yield* Fiber.await(fiber)
+
+ const result = yield* runner.ensureRunning(Effect.succeed("after-cancel"))
+ expect(result).toBe("after-cancel")
+ }),
+ )
+
+ test("cancel does not deadlock when replacement work starts before interrupted run exits", async () => {
+ function defer() {
+ let resolve!: () => void
+ const promise = new Promise<void>((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+ }
+
+ function fail(ms: number, msg: string) {
+ return new Promise<never>((_, reject) => {
+ setTimeout(() => reject(new Error(msg)), ms)
+ })
+ }
+
+ const s = await Effect.runPromise(Scope.make())
+ const hit = defer()
+ const hold = defer()
+ const done = defer()
+ try {
+ const runner = Runner.make<string>(s)
+ const first = Effect.never.pipe(
+ Effect.onInterrupt(() => Effect.sync(() => hit.resolve())),
+ Effect.ensuring(Effect.promise(() => hold.promise)),
+ Effect.as("first"),
+ )
+
+ const a = Effect.runPromiseExit(runner.ensureRunning(first))
+ await Bun.sleep(10)
+
+ const stop = Effect.runPromise(runner.cancel)
+ await Promise.race([hit.promise, fail(250, "cancel did not interrupt running work")])
+
+ const b = Effect.runPromise(runner.ensureRunning(Effect.promise(() => done.promise).pipe(Effect.as("second"))))
+ expect(runner.busy).toBe(true)
+
+ hold.resolve()
+ await Promise.race([stop, fail(250, "cancel deadlocked while replacement run was active")])
+
+ expect(runner.busy).toBe(true)
+ done.resolve()
+ expect(await b).toBe("second")
+ expect(runner.busy).toBe(false)
+
+ const exit = await a
+ expect(Exit.isFailure(exit)).toBe(true)
+ } finally {
+ hold.resolve()
+ done.resolve()
+ await Promise.race([Effect.runPromise(Scope.close(s, Exit.void)), fail(1000, "runner scope did not close")])
+ }
+ })
+
+ // --- shell semantics ---
+
+ it.effect(
+ "shell runs exclusively",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done"))
+ expect(result).toBe("shell-done")
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "shell rejects when run is active",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+
+ yield* runner.cancel
+ yield* Fiber.await(fiber)
+ }),
+ )
+
+ it.effect(
+ "shell rejects when another shell is running",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const gate = yield* Deferred.make<void>()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(sh)
+ }),
+ )
+
+ it.effect(
+ "shell rejects via busy callback and cancel still stops the first shell",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s, {
+ busy: () => {
+ throw new Error("busy")
+ },
+ })
+
+ const sh = yield* runner
+ .startShell((signal) =>
+ Effect.promise(
+ () =>
+ new Promise<string>((resolve) => {
+ signal.addEventListener("abort", () => resolve("aborted"), { once: true })
+ }),
+ ),
+ )
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+
+ yield* runner.cancel
+ const done = yield* Fiber.await(sh)
+ expect(Exit.isSuccess(done)).toBe(true)
+ }),
+ )
+
+ it.effect(
+ "cancel interrupts shell that ignores abort signal",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const gate = yield* Deferred.make<void>()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const stop = yield* runner.cancel.pipe(Effect.forkChild)
+ const stopExit = yield* Fiber.await(stop).pipe(Effect.timeout("250 millis"))
+ expect(Exit.isSuccess(stopExit)).toBe(true)
+ expect(runner.busy).toBe(false)
+
+ const shellExit = yield* Fiber.await(sh)
+ expect(Exit.isFailure(shellExit)).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined).pipe(Effect.ignore)
+ }),
+ )
+
+ // --- shell→run handoff ---
+
+ it.effect(
+ "ensureRunning queues behind shell then runs after",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const gate = yield* Deferred.make<void>()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.state._tag).toBe("Shell")
+
+ const run = yield* runner.ensureRunning(Effect.succeed("run-result")).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.state._tag).toBe("ShellThenRun")
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(sh)
+
+ const exit = yield* Fiber.await(run)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) expect(exit.value).toBe("run-result")
+ expect(runner.state._tag).toBe("Idle")
+ }),
+ )
+
+ it.effect(
+ "multiple ensureRunning callers share the queued run behind shell",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const calls = yield* Ref.make(0)
+ const gate = yield* Deferred.make<void>()
+
+ const sh = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const work = Effect.gen(function* () {
+ yield* Ref.update(calls, (n) => n + 1)
+ return "run"
+ })
+ const a = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
+ const b = yield* runner.ensureRunning(work).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(sh)
+
+ const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(exitA)).toBe(true)
+ expect(Exit.isSuccess(exitB)).toBe(true)
+ expect(yield* Ref.get(calls)).toBe(1)
+ }),
+ )
+
+ it.effect(
+ "cancel during shell_then_run cancels both",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const gate = yield* Deferred.make<void>()
+
+ const sh = yield* runner
+ .startShell((signal) =>
+ Effect.promise(
+ () =>
+ new Promise<string>((resolve) => {
+ signal.addEventListener("abort", () => resolve("aborted"), { once: true })
+ }),
+ ),
+ )
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+
+ const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.state._tag).toBe("ShellThenRun")
+
+ yield* runner.cancel
+ expect(runner.busy).toBe(false)
+
+ yield* Fiber.await(sh)
+ const exit = yield* Fiber.await(run)
+ expect(Exit.isFailure(exit)).toBe(true)
+ }),
+ )
+
+ // --- lifecycle callbacks ---
+
+ it.effect(
+ "onIdle fires when returning to idle from running",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const count = yield* Ref.make(0)
+ const runner = Runner.make<string>(s, {
+ onIdle: Ref.update(count, (n) => n + 1),
+ })
+ yield* runner.ensureRunning(Effect.succeed("ok"))
+ expect(yield* Ref.get(count)).toBe(1)
+ }),
+ )
+
+ it.effect(
+ "onIdle fires on cancel",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const count = yield* Ref.make(0)
+ const runner = Runner.make<string>(s, {
+ onIdle: Ref.update(count, (n) => n + 1),
+ })
+ const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ yield* runner.cancel
+ yield* Fiber.await(fiber)
+ expect(yield* Ref.get(count)).toBeGreaterThanOrEqual(1)
+ }),
+ )
+
+ it.effect(
+ "onBusy fires when shell starts",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const count = yield* Ref.make(0)
+ const runner = Runner.make<string>(s, {
+ onBusy: Ref.update(count, (n) => n + 1),
+ })
+ yield* runner.startShell((_signal) => Effect.succeed("done"))
+ expect(yield* Ref.get(count)).toBe(1)
+ }),
+ )
+
+ // --- busy flag ---
+
+ it.effect(
+ "busy is true during run",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const gate = yield* Deferred.make<void>()
+
+ const fiber = yield* runner.ensureRunning(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.busy).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(fiber)
+ expect(runner.busy).toBe(false)
+ }),
+ )
+
+ it.effect(
+ "busy is true during shell",
+ Effect.gen(function* () {
+ const s = yield* Scope.Scope
+ const runner = Runner.make<string>(s)
+ const gate = yield* Deferred.make<void>()
+
+ const fiber = yield* runner
+ .startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok")))
+ .pipe(Effect.forkChild)
+ yield* Effect.sleep("10 millis")
+ expect(runner.busy).toBe(true)
+
+ yield* Deferred.succeed(gate, undefined)
+ yield* Fiber.await(fiber)
+ expect(runner.busy).toBe(false)
+ }),
+ )
+})
diff --git a/packages/opencode/test/server/session-list.test.ts b/packages/opencode/test/server/session-list.test.ts
index 675a89011..933b5b5b5 100644
--- a/packages/opencode/test/server/session-list.test.ts
+++ b/packages/opencode/test/server/session-list.test.ts
@@ -1,26 +1,30 @@
-import { describe, expect, test } from "bun:test"
-import path from "path"
+import { afterEach, describe, expect, test } from "bun:test"
import { Instance } from "../../src/project/instance"
import { Session } from "../../src/session"
import { Log } from "../../src/util/log"
+import { tmpdir } from "../fixture/fixture"
-const projectRoot = path.join(__dirname, "../..")
Log.init({ print: false })
+afterEach(async () => {
+ await Instance.disposeAll()
+})
+
describe("Session.list", () => {
test("filters by directory", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
const first = await Session.create({})
- const otherDir = path.join(projectRoot, "..", "__session_list_other")
+ await using other = await tmpdir({ git: true })
const second = await Instance.provide({
- directory: otherDir,
+ directory: other.path,
fn: async () => Session.create({}),
})
- const sessions = [...Session.list({ directory: projectRoot })]
+ const sessions = [...Session.list({ directory: tmp.path })]
const ids = sessions.map((s) => s.id)
expect(ids).toContain(first.id)
@@ -30,8 +34,9 @@ describe("Session.list", () => {
})
test("filters root sessions", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
const root = await Session.create({ title: "root-session" })
const child = await Session.create({ title: "child-session", parentID: root.id })
@@ -46,8 +51,9 @@ describe("Session.list", () => {
})
test("filters by start time", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({ title: "new-session" })
const futureStart = Date.now() + 86400000
@@ -59,8 +65,9 @@ describe("Session.list", () => {
})
test("filters by search term", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
await Session.create({ title: "unique-search-term-abc" })
await Session.create({ title: "other-session-xyz" })
@@ -75,8 +82,9 @@ describe("Session.list", () => {
})
test("respects limit parameter", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
await Session.create({ title: "session-1" })
await Session.create({ title: "session-2" })
diff --git a/packages/opencode/test/server/session-messages.test.ts b/packages/opencode/test/server/session-messages.test.ts
index 91e0fd926..d7e44cbec 100644
--- a/packages/opencode/test/server/session-messages.test.ts
+++ b/packages/opencode/test/server/session-messages.test.ts
@@ -1,15 +1,18 @@
-import { describe, expect, test } from "bun:test"
-import path from "path"
+import { afterEach, describe, expect, test } from "bun:test"
import { Instance } from "../../src/project/instance"
import { Server } from "../../src/server/server"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { Log } from "../../src/util/log"
+import { tmpdir } from "../fixture/fixture"
-const root = path.join(__dirname, "../..")
Log.init({ print: false })
+afterEach(async () => {
+ await Instance.disposeAll()
+})
+
async function fill(sessionID: SessionID, count: number, time = (i: number) => Date.now() + i) {
const ids = [] as MessageID[]
for (let i = 0; i < count; i++) {
@@ -38,8 +41,9 @@ async function fill(sessionID: SessionID, count: number, time = (i: number) => D
describe("session messages endpoint", () => {
test("returns cursor headers for older pages", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 5)
@@ -64,8 +68,9 @@ describe("session messages endpoint", () => {
})
test("keeps full-history responses when limit is omitted", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const ids = await fill(session.id, 3)
@@ -82,8 +87,9 @@ describe("session messages endpoint", () => {
})
test("rejects invalid cursors and missing sessions", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
const app = Server.Default()
@@ -100,8 +106,9 @@ describe("session messages endpoint", () => {
})
test("does not truncate large legacy limit requests", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: root,
+ directory: tmp.path,
fn: async () => {
const session = await Session.create({})
await fill(session.id, 520)
@@ -120,7 +127,7 @@ describe("session messages endpoint", () => {
describe("session.prompt_async error handling", () => {
test("prompt_async route has error handler for detached prompt call", async () => {
- const src = await Bun.file(path.join(import.meta.dir, "../../src/server/routes/session.ts")).text()
+ const src = await Bun.file(new URL("../../src/server/routes/session.ts", import.meta.url)).text()
const start = src.indexOf('"/:sessionID/prompt_async"')
const end = src.indexOf('"/:sessionID/command"', start)
expect(start).toBeGreaterThan(-1)
diff --git a/packages/opencode/test/server/session-select.test.ts b/packages/opencode/test/server/session-select.test.ts
index a336f8133..345b43146 100644
--- a/packages/opencode/test/server/session-select.test.ts
+++ b/packages/opencode/test/server/session-select.test.ts
@@ -1,17 +1,21 @@
-import { describe, expect, test } from "bun:test"
-import path from "path"
+import { afterEach, describe, expect, test } from "bun:test"
import { Session } from "../../src/session"
import { Log } from "../../src/util/log"
import { Instance } from "../../src/project/instance"
import { Server } from "../../src/server/server"
+import { tmpdir } from "../fixture/fixture"
-const projectRoot = path.join(__dirname, "../..")
Log.init({ print: false })
+afterEach(async () => {
+ await Instance.disposeAll()
+})
+
describe("tui.selectSession endpoint", () => {
test("should return 200 when called with valid session", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
// #given
const session = await Session.create({})
@@ -35,8 +39,9 @@ describe("tui.selectSession endpoint", () => {
})
test("should return 404 when session does not exist", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
// #given
const nonExistentSessionID = "ses_nonexistent123"
@@ -56,8 +61,9 @@ describe("tui.selectSession endpoint", () => {
})
test("should return 400 when session ID format is invalid", async () => {
+ await using tmp = await tmpdir({ git: true })
await Instance.provide({
- directory: projectRoot,
+ directory: tmp.path,
fn: async () => {
// #given
const invalidSessionID = "invalid_session_id"
diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts
index 8f29b7788..c08fef563 100644
--- a/packages/opencode/test/session/compaction.test.ts
+++ b/packages/opencode/test/session/compaction.test.ts
@@ -129,7 +129,7 @@ async function tool(sessionID: SessionID, messageID: MessageID, tool: string, ou
}
function fake(
- input: Parameters<(typeof SessionProcessorModule.SessionProcessor)["create"]>[0],
+ input: Parameters<SessionProcessorModule.SessionProcessor.Interface["create"]>[0],
result: "continue" | "compact",
) {
const msg = input.assistantMessage
@@ -540,7 +540,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: false,
}),
),
@@ -580,7 +579,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: false,
}),
),
@@ -621,7 +619,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: true,
}),
),
@@ -675,7 +672,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: true,
overflow: true,
}),
@@ -717,7 +713,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: new AbortController().signal,
auto: true,
overflow: true,
}),
@@ -792,7 +787,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: abort.signal,
auto: false,
}),
),
@@ -858,7 +852,6 @@ describe("session.compaction.process", () => {
parentID: msg.id,
messages: msgs,
sessionID: session.id,
- abort: abort.signal,
auto: false,
}),
),
@@ -892,6 +885,91 @@ describe("session.compaction.process", () => {
},
})
})
+
+ test("does not allow tool calls while generating the summary", async () => {
+ const stub = llm()
+ stub.push(
+ Stream.make(
+ { type: "start" } satisfies LLM.Event,
+ { type: "tool-input-start", id: "call-1", toolName: "_noop" } satisfies LLM.Event,
+ { type: "tool-call", toolCallId: "call-1", toolName: "_noop", input: {} } satisfies LLM.Event,
+ {
+ type: "finish-step",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: {
+ inputTokens: 1,
+ outputTokens: 1,
+ totalTokens: 2,
+ inputTokenDetails: {
+ noCacheTokens: undefined,
+ cacheReadTokens: undefined,
+ cacheWriteTokens: undefined,
+ },
+ outputTokenDetails: {
+ textTokens: undefined,
+ reasoningTokens: undefined,
+ },
+ },
+ } satisfies LLM.Event,
+ {
+ type: "finish",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ totalUsage: {
+ inputTokens: 1,
+ outputTokens: 1,
+ totalTokens: 2,
+ inputTokenDetails: {
+ noCacheTokens: undefined,
+ cacheReadTokens: undefined,
+ cacheWriteTokens: undefined,
+ },
+ outputTokenDetails: {
+ textTokens: undefined,
+ reasoningTokens: undefined,
+ },
+ },
+ } satisfies LLM.Event,
+ ),
+ )
+
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ spyOn(ProviderModule.Provider, "getModel").mockResolvedValue(createModel({ context: 100_000, output: 32_000 }))
+
+ const session = await Session.create({})
+ const msg = await user(session.id, "hello")
+ const rt = liveRuntime(stub.layer)
+ try {
+ const msgs = await Session.messages({ sessionID: session.id })
+ await rt.runPromise(
+ SessionCompaction.Service.use((svc) =>
+ svc.process({
+ parentID: msg.id,
+ messages: msgs,
+ sessionID: session.id,
+ auto: false,
+ }),
+ ),
+ )
+
+ const summary = (await Session.messages({ sessionID: session.id })).find(
+ (item) => item.info.role === "assistant" && item.info.summary,
+ )
+
+ expect(summary?.info.role).toBe("assistant")
+ expect(summary?.parts.some((part) => part.type === "tool")).toBe(false)
+ } finally {
+ await rt.dispose()
+ }
+ },
+ })
+ })
})
describe("util.token.estimate", () => {
diff --git a/packages/opencode/test/session/llm.test.ts b/packages/opencode/test/session/llm.test.ts
index 8de7d2723..bb81aa681 100644
--- a/packages/opencode/test/session/llm.test.ts
+++ b/packages/opencode/test/session/llm.test.ts
@@ -1,7 +1,9 @@
import { afterAll, beforeAll, beforeEach, describe, expect, test } from "bun:test"
import path from "path"
import { tool, type ModelMessage } from "ai"
+import { Cause, Exit, Stream } from "effect"
import z from "zod"
+import { makeRuntime } from "../../src/effect/run-service"
import { LLM } from "../../src/session/llm"
import { Instance } from "../../src/project/instance"
import { Provider } from "../../src/provider/provider"
@@ -109,7 +111,11 @@ type Capture = {
const state = {
server: null as ReturnType<typeof Bun.serve> | null,
- queue: [] as Array<{ path: string; response: Response; resolve: (value: Capture) => void }>,
+ queue: [] as Array<{
+ path: string
+ response: Response | ((req: Request, capture: Capture) => Response)
+ resolve: (value: Capture) => void
+ }>,
}
function deferred<T>() {
@@ -126,6 +132,58 @@ function waitRequest(pathname: string, response: Response) {
return pending.promise
}
+function timeout(ms: number) {
+ return new Promise<never>((_, reject) => {
+ setTimeout(() => reject(new Error(`timed out after ${ms}ms`)), ms)
+ })
+}
+
+function waitStreamingRequest(pathname: string) {
+ const request = deferred<Capture>()
+ const requestAborted = deferred<void>()
+ const responseCanceled = deferred<void>()
+ const encoder = new TextEncoder()
+
+ state.queue.push({
+ path: pathname,
+ resolve: request.resolve,
+ response(req: Request) {
+ req.signal.addEventListener("abort", () => requestAborted.resolve(), { once: true })
+
+ return new Response(
+ new ReadableStream<Uint8Array>({
+ start(controller) {
+ controller.enqueue(
+ encoder.encode(
+ [
+ `data: ${JSON.stringify({
+ id: "chatcmpl-abort",
+ object: "chat.completion.chunk",
+ choices: [{ delta: { role: "assistant" } }],
+ })}`,
+ ].join("\n\n") + "\n\n",
+ ),
+ )
+ },
+ cancel() {
+ responseCanceled.resolve()
+ },
+ }),
+ {
+ status: 200,
+ headers: { "Content-Type": "text/event-stream" },
+ },
+ )
+ },
+ })
+
+ return {
+ request: request.promise,
+ requestAborted: requestAborted.promise,
+ responseCanceled: responseCanceled.promise,
+ }
+}
+
beforeAll(() => {
state.server = Bun.serve({
port: 0,
@@ -143,7 +201,9 @@ beforeAll(() => {
return new Response("not found", { status: 404 })
}
- return next.response
+ return typeof next.response === "function"
+ ? next.response(req, { url, headers: req.headers, body })
+ : next.response
},
})
})
@@ -325,6 +385,162 @@ describe("session.llm.stream", () => {
})
})
+ test("raw stream abort signal cancels provider response body promptly", async () => {
+ const server = state.server
+ if (!server) throw new Error("Server not initialized")
+
+ const providerID = "alibaba"
+ const modelID = "qwen-plus"
+ const fixture = await loadFixture(providerID, modelID)
+ const model = fixture.model
+ const pending = waitStreamingRequest("/chat/completions")
+
+ await using tmp = await tmpdir({
+ init: async (dir) => {
+ await Bun.write(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({
+ $schema: "https://opencode.ai/config.json",
+ enabled_providers: [providerID],
+ provider: {
+ [providerID]: {
+ options: {
+ apiKey: "test-key",
+ baseURL: `${server.url.origin}/v1`,
+ },
+ },
+ },
+ }),
+ )
+ },
+ })
+
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const resolved = await Provider.getModel(ProviderID.make(providerID), ModelID.make(model.id))
+ const sessionID = SessionID.make("session-test-raw-abort")
+ const agent = {
+ name: "test",
+ mode: "primary",
+ options: {},
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ } satisfies Agent.Info
+ const user = {
+ id: MessageID.make("user-raw-abort"),
+ sessionID,
+ role: "user",
+ time: { created: Date.now() },
+ agent: agent.name,
+ model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
+ } satisfies MessageV2.User
+
+ const ctrl = new AbortController()
+ const result = await LLM.stream({
+ user,
+ sessionID,
+ model: resolved,
+ agent,
+ system: ["You are a helpful assistant."],
+ abort: ctrl.signal,
+ messages: [{ role: "user", content: "Hello" }],
+ tools: {},
+ })
+
+ const iter = result.fullStream[Symbol.asyncIterator]()
+ await pending.request
+ await iter.next()
+ ctrl.abort()
+
+ await Promise.race([pending.responseCanceled, timeout(500)])
+ await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
+ await iter.return?.()
+ },
+ })
+ })
+
+ test("service stream cancellation cancels provider response body promptly", async () => {
+ const server = state.server
+ if (!server) throw new Error("Server not initialized")
+
+ const providerID = "alibaba"
+ const modelID = "qwen-plus"
+ const fixture = await loadFixture(providerID, modelID)
+ const model = fixture.model
+ const pending = waitStreamingRequest("/chat/completions")
+
+ await using tmp = await tmpdir({
+ init: async (dir) => {
+ await Bun.write(
+ path.join(dir, "opencode.json"),
+ JSON.stringify({
+ $schema: "https://opencode.ai/config.json",
+ enabled_providers: [providerID],
+ provider: {
+ [providerID]: {
+ options: {
+ apiKey: "test-key",
+ baseURL: `${server.url.origin}/v1`,
+ },
+ },
+ },
+ }),
+ )
+ },
+ })
+
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const resolved = await Provider.getModel(ProviderID.make(providerID), ModelID.make(model.id))
+ const sessionID = SessionID.make("session-test-service-abort")
+ const agent = {
+ name: "test",
+ mode: "primary",
+ options: {},
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ } satisfies Agent.Info
+ const user = {
+ id: MessageID.make("user-service-abort"),
+ sessionID,
+ role: "user",
+ time: { created: Date.now() },
+ agent: agent.name,
+ model: { providerID: ProviderID.make(providerID), modelID: resolved.id },
+ } satisfies MessageV2.User
+
+ const ctrl = new AbortController()
+ const { runPromiseExit } = makeRuntime(LLM.Service, LLM.defaultLayer)
+ const run = runPromiseExit(
+ (svc) =>
+ svc
+ .stream({
+ user,
+ sessionID,
+ model: resolved,
+ agent,
+ system: ["You are a helpful assistant."],
+ messages: [{ role: "user", content: "Hello" }],
+ tools: {},
+ })
+ .pipe(Stream.runDrain),
+ { signal: ctrl.signal },
+ )
+
+ await pending.request
+ ctrl.abort()
+
+ await Promise.race([pending.responseCanceled, timeout(500)])
+ const exit = await run
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.hasInterrupts(exit.cause)).toBe(true)
+ }
+ await Promise.race([pending.requestAborted, timeout(500)]).catch(() => undefined)
+ },
+ })
+ })
+
test("keeps tools enabled by prompt permissions", async () => {
const server = state.server
if (!server) {
diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts
index cd9d97e15..0dfdef26f 100644
--- a/packages/opencode/test/session/processor-effect.test.ts
+++ b/packages/opencode/test/session/processor-effect.test.ts
@@ -1,7 +1,7 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { APICallError } from "ai"
-import { Effect, Layer, ServiceMap } from "effect"
+import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import path from "path"
import type { Agent } from "../../src/agent/agent"
@@ -10,7 +10,6 @@ import { Bus } from "../../src/bus"
import { Config } from "../../src/config/config"
import { Permission } from "../../src/permission"
import { Plugin } from "../../src/plugin"
-import { Instance } from "../../src/project/instance"
import type { Provider } from "../../src/provider/provider"
import { ModelID, ProviderID } from "../../src/provider/schema"
import { Session } from "../../src/session"
@@ -120,21 +119,8 @@ function fail<E>(err: E, ...items: LLM.Event[]) {
return stream(...items).pipe(Stream.concat(Stream.fail(err)))
}
-function wait(abort: AbortSignal) {
- return Effect.promise(
- () =>
- new Promise<void>((done) => {
- abort.addEventListener("abort", () => done(), { once: true })
- }),
- )
-}
-
-function hang(input: LLM.StreamInput, ...items: LLM.Event[]) {
- return stream(...items).pipe(
- Stream.concat(
- Stream.unwrap(wait(input.abort).pipe(Effect.as(Stream.fail(new DOMException("Aborted", "AbortError"))))),
- ),
- )
+function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
+ return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
}
function model(context: number): Provider.Model {
@@ -291,13 +277,11 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const input = {
@@ -313,7 +297,6 @@ it.effect("session.processor effect tests capture llm input cleanly", () => {
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "hi" }],
tools: {},
} satisfies LLM.StreamInput
@@ -359,13 +342,11 @@ it.effect("session.processor effect tests stop after token overflow requests com
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(20)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -381,7 +362,6 @@ it.effect("session.processor effect tests stop after token overflow requests com
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "compact" }],
tools: {},
})
@@ -433,13 +413,11 @@ it.effect("session.processor effect tests reset reasoning state across retries",
const chat = yield* session.create({})
const parent = yield* user(chat.id, "reason")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -455,7 +433,6 @@ it.effect("session.processor effect tests reset reasoning state across retries",
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "reason" }],
tools: {},
})
@@ -485,13 +462,11 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
const chat = yield* session.create({})
const parent = yield* user(chat.id, "json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -507,7 +482,6 @@ it.effect("session.processor effect tests do not retry unknown json errors", ()
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "json" }],
tools: {},
})
@@ -535,13 +509,11 @@ it.effect("session.processor effect tests retry recognized structured json error
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -557,7 +529,6 @@ it.effect("session.processor effect tests retry recognized structured json error
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "retry json" }],
tools: {},
})
@@ -601,7 +572,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
const chat = yield* session.create({})
const parent = yield* user(chat.id, "retry")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const states: number[] = []
const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
@@ -612,7 +582,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -628,7 +597,6 @@ it.effect("session.processor effect tests publish retry status updates", () => {
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "retry" }],
tools: {},
})
@@ -656,13 +624,11 @@ it.effect("session.processor effect tests compact on structured context overflow
const chat = yield* session.create({})
const parent = yield* user(chat.id, "compact json")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
const value = yield* handle.process({
@@ -678,7 +644,6 @@ it.effect("session.processor effect tests compact on structured context overflow
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "compact json" }],
tools: {},
})
@@ -696,7 +661,6 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
(dir) =>
Effect.gen(function* () {
const ready = defer<void>()
- const seen = defer<void>()
const test = yield* TestLLM
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
@@ -710,17 +674,15 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
const chat = yield* session.create({})
const parent = yield* user(chat.id, "tool abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
- const run = Effect.runPromise(
- handle.process({
+ const run = yield* handle
+ .process({
user: {
id: parent.id,
sessionID: chat.id,
@@ -733,20 +695,25 @@ it.effect("session.processor effect tests mark pending tools as aborted on clean
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "tool abort" }],
tools: {},
- }),
- )
+ })
+ .pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
- abort.abort()
+ yield* Fiber.interrupt(run)
- const value = yield* Effect.promise(() => run)
+ const exit = yield* Fiber.await(run)
+ if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
+ yield* handle.abort()
+ }
const parts = yield* Effect.promise(() => MessageV2.parts(msg.id))
const tool = parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
- expect(value).toBe("stop")
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
+ }
expect(yield* test.calls).toBe(1)
expect(tool?.state.status).toBe("error")
if (tool?.state.status === "error") {
@@ -779,7 +746,6 @@ it.effect("session.processor effect tests record aborted errors and idle state",
const chat = yield* session.create({})
const parent = yield* user(chat.id, "abort")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
- const abort = new AbortController()
const mdl = model(100)
const errs: string[] = []
const off = yield* bus.subscribeCallback(Session.Event.Error, (evt) => {
@@ -792,11 +758,10 @@ it.effect("session.processor effect tests record aborted errors and idle state",
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
- abort: abort.signal,
})
- const run = Effect.runPromise(
- handle.process({
+ const run = yield* handle
+ .process({
user: {
id: parent.id,
sessionID: chat.id,
@@ -809,22 +774,27 @@ it.effect("session.processor effect tests record aborted errors and idle state",
model: mdl,
agent: agent(),
system: [],
- abort: abort.signal,
messages: [{ role: "user", content: "abort" }],
tools: {},
- }),
- )
+ })
+ .pipe(Effect.forkChild)
yield* Effect.promise(() => ready.promise)
- abort.abort()
+ yield* Fiber.interrupt(run)
- const value = yield* Effect.promise(() => run)
+ const exit = yield* Fiber.await(run)
+ if (Exit.isFailure(exit) && Cause.hasInterruptsOnly(exit.cause)) {
+ yield* handle.abort()
+ }
yield* Effect.promise(() => seen.promise)
const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
const state = yield* status.get(chat.id)
off()
- expect(value).toBe("stop")
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
+ }
expect(handle.message.error?.name).toBe("MessageAbortedError")
expect(stored.info.role).toBe("assistant")
if (stored.info.role === "assistant") {
@@ -836,3 +806,67 @@ it.effect("session.processor effect tests record aborted errors and idle state",
{ git: true },
)
})
+
+it.effect("session.processor effect tests mark interruptions aborted without manual abort", () => {
+ return provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const processors = yield* SessionProcessor.Service
+ const session = yield* Session.Service
+ const status = yield* SessionStatus.Service
+ const test = yield* TestLLM
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+
+ const chat = yield* session.create({})
+ const parent = yield* user(chat.id, "interrupt")
+ const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
+ const mdl = model(100)
+ const handle = yield* processors.create({
+ assistantMessage: msg,
+ sessionID: chat.id,
+ model: mdl,
+ })
+
+ const run = yield* handle
+ .process({
+ user: {
+ id: parent.id,
+ sessionID: chat.id,
+ role: "user",
+ time: parent.time,
+ agent: parent.agent,
+ model: { providerID: ref.providerID, modelID: ref.modelID },
+ } satisfies MessageV2.User,
+ sessionID: chat.id,
+ model: mdl,
+ agent: agent(),
+ system: [],
+ messages: [{ role: "user", content: "interrupt" }],
+ tools: {},
+ })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(() => ready.promise)
+ yield* Fiber.interrupt(run)
+
+ const exit = yield* Fiber.await(run)
+ const stored = yield* Effect.promise(() => MessageV2.get({ sessionID: chat.id, messageID: msg.id }))
+ const state = yield* status.get(chat.id)
+
+ expect(Exit.isFailure(exit)).toBe(true)
+ expect(handle.message.error?.name).toBe("MessageAbortedError")
+ expect(stored.info.role).toBe("assistant")
+ if (stored.info.role === "assistant") {
+ expect(stored.info.error?.name).toBe("MessageAbortedError")
+ }
+ expect(state).toMatchObject({ type: "idle" })
+ }),
+ { git: true },
+ )
+})
diff --git a/packages/opencode/test/session/prompt-concurrency.test.ts b/packages/opencode/test/session/prompt-concurrency.test.ts
new file mode 100644
index 000000000..19e1c4bf5
--- /dev/null
+++ b/packages/opencode/test/session/prompt-concurrency.test.ts
@@ -0,0 +1,247 @@
+import { describe, expect, spyOn, test } from "bun:test"
+import { Instance } from "../../src/project/instance"
+import { Provider } from "../../src/provider/provider"
+import { Session } from "../../src/session"
+import { MessageV2 } from "../../src/session/message-v2"
+import { SessionPrompt } from "../../src/session/prompt"
+import { SessionStatus } from "../../src/session/status"
+import { MessageID, PartID, SessionID } from "../../src/session/schema"
+import { Log } from "../../src/util/log"
+import { tmpdir } from "../fixture/fixture"
+
+Log.init({ print: false })
+
+function deferred() {
+ let resolve!: () => void
+ const promise = new Promise<void>((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+}
+
+// Helper: seed a session with a user message + finished assistant message
+// so loop() exits immediately without calling any LLM
+async function seed(sessionID: SessionID) {
+ const userMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID,
+ time: { created: Date.now() },
+ agent: "build",
+ model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
+ }
+ await Session.updateMessage(userMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: userMsg.id,
+ sessionID,
+ type: "text",
+ text: "hello",
+ })
+
+ const assistantMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "assistant",
+ parentID: userMsg.id,
+ sessionID,
+ mode: "build",
+ agent: "build",
+ cost: 0,
+ path: { cwd: "/tmp", root: "/tmp" },
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: "gpt-5.2" as any,
+ providerID: "openai" as any,
+ time: { created: Date.now(), completed: Date.now() },
+ finish: "stop",
+ }
+ await Session.updateMessage(assistantMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: assistantMsg.id,
+ sessionID,
+ type: "text",
+ text: "hi there",
+ })
+
+ return { userMsg, assistantMsg }
+}
+
+describe("session.prompt concurrency", () => {
+ test("loop returns assistant message and sets status to idle", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ await seed(session.id)
+
+ const result = await SessionPrompt.loop({ sessionID: session.id })
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
+
+ const status = await SessionStatus.get(session.id)
+ expect(status.type).toBe("idle")
+ },
+ })
+ })
+
+ test("concurrent loop callers get the same result", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ await seed(session.id)
+
+ const [a, b] = await Promise.all([
+ SessionPrompt.loop({ sessionID: session.id }),
+ SessionPrompt.loop({ sessionID: session.id }),
+ ])
+
+ expect(a.info.id).toBe(b.info.id)
+ expect(a.info.role).toBe("assistant")
+ },
+ })
+ })
+
+ test("assertNotBusy throws when loop is running", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ const userMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID: session.id,
+ time: { created: Date.now() },
+ agent: "build",
+ model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
+ }
+ await Session.updateMessage(userMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: userMsg.id,
+ sessionID: session.id,
+ type: "text",
+ text: "hello",
+ })
+
+ const ready = deferred()
+ const gate = deferred()
+ const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
+ ready.resolve()
+ await gate.promise
+ throw new Error("test stop")
+ })
+
+ try {
+ const loopPromise = SessionPrompt.loop({ sessionID: session.id }).catch(() => undefined)
+ await ready.promise
+
+ await expect(SessionPrompt.assertNotBusy(session.id)).rejects.toBeInstanceOf(Session.BusyError)
+
+ gate.resolve()
+ await loopPromise
+ } finally {
+ gate.resolve()
+ getModel.mockRestore()
+ }
+
+ // After loop completes, assertNotBusy should succeed
+ await SessionPrompt.assertNotBusy(session.id)
+ },
+ })
+ })
+
+ test("cancel sets status to idle", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ // Seed only a user message — loop must call getModel to proceed
+ const userMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID: session.id,
+ time: { created: Date.now() },
+ agent: "build",
+ model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
+ }
+ await Session.updateMessage(userMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: userMsg.id,
+ sessionID: session.id,
+ type: "text",
+ text: "hello",
+ })
+ // Also seed an assistant message so lastAssistant() fallback can find it
+ const assistantMsg: MessageV2.Info = {
+ id: MessageID.ascending(),
+ role: "assistant",
+ parentID: userMsg.id,
+ sessionID: session.id,
+ mode: "build",
+ agent: "build",
+ cost: 0,
+ path: { cwd: "/tmp", root: "/tmp" },
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: "gpt-5.2" as any,
+ providerID: "openai" as any,
+ time: { created: Date.now() },
+ }
+ await Session.updateMessage(assistantMsg)
+ await Session.updatePart({
+ id: PartID.ascending(),
+ messageID: assistantMsg.id,
+ sessionID: session.id,
+ type: "text",
+ text: "hi there",
+ })
+
+ const ready = deferred()
+ const gate = deferred()
+ const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
+ ready.resolve()
+ await gate.promise
+ throw new Error("test stop")
+ })
+
+ try {
+ // Start loop — it will block in getModel (assistant has no finish, so loop continues)
+ const loopPromise = SessionPrompt.loop({ sessionID: session.id })
+
+ await ready.promise
+
+ await SessionPrompt.cancel(session.id)
+
+ const status = await SessionStatus.get(session.id)
+ expect(status.type).toBe("idle")
+
+ // loop should resolve cleanly, not throw "All fibers interrupted"
+ const result = await loopPromise
+ expect(result.info.role).toBe("assistant")
+ expect(result.info.id).toBe(assistantMsg.id)
+ } finally {
+ gate.resolve()
+ getModel.mockRestore()
+ }
+ },
+ })
+ }, 10000)
+
+ test("cancel on idle session just sets idle", async () => {
+ await using tmp = await tmpdir({ git: true })
+ await Instance.provide({
+ directory: tmp.path,
+ fn: async () => {
+ const session = await Session.create({})
+ await SessionPrompt.cancel(session.id)
+ const status = await SessionStatus.get(session.id)
+ expect(status.type).toBe("idle")
+ },
+ })
+ })
+})
diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts
new file mode 100644
index 000000000..9f35a21f4
--- /dev/null
+++ b/packages/opencode/test/session/prompt-effect.test.ts
@@ -0,0 +1,1140 @@
+import { NodeFileSystem } from "@effect/platform-node"
+import { expect } from "bun:test"
+import { Cause, Effect, Exit, Fiber, Layer, ServiceMap } from "effect"
+import * as Stream from "effect/Stream"
+import type { Agent } from "../../src/agent/agent"
+import { Agent as AgentSvc } from "../../src/agent/agent"
+import { Bus } from "../../src/bus"
+import { Command } from "../../src/command"
+import { Config } from "../../src/config/config"
+import { FileTime } from "../../src/file/time"
+import { LSP } from "../../src/lsp"
+import { MCP } from "../../src/mcp"
+import { Permission } from "../../src/permission"
+import { Plugin } from "../../src/plugin"
+import type { Provider } from "../../src/provider/provider"
+import { ModelID, ProviderID } from "../../src/provider/schema"
+import { Session } from "../../src/session"
+import { LLM } from "../../src/session/llm"
+import { MessageV2 } from "../../src/session/message-v2"
+import { AppFileSystem } from "../../src/filesystem"
+import { SessionCompaction } from "../../src/session/compaction"
+import { SessionProcessor } from "../../src/session/processor"
+import { SessionPrompt } from "../../src/session/prompt"
+import { MessageID, PartID, SessionID } from "../../src/session/schema"
+import { SessionStatus } from "../../src/session/status"
+import { Shell } from "../../src/shell/shell"
+import { Snapshot } from "../../src/snapshot"
+import { ToolRegistry } from "../../src/tool/registry"
+import { Truncate } from "../../src/tool/truncate"
+import { Log } from "../../src/util/log"
+import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
+import { provideTmpdirInstance } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
+
+Log.init({ print: false })
+
+const ref = {
+ providerID: ProviderID.make("test"),
+ modelID: ModelID.make("test-model"),
+}
+
+type Script = Stream.Stream<LLM.Event, unknown> | ((input: LLM.StreamInput) => Stream.Stream<LLM.Event, unknown>)
+
+class TestLLM extends ServiceMap.Service<
+ TestLLM,
+ {
+ readonly push: (stream: Script) => Effect.Effect<void>
+ readonly reply: (...items: LLM.Event[]) => Effect.Effect<void>
+ readonly calls: Effect.Effect<number>
+ readonly inputs: Effect.Effect<LLM.StreamInput[]>
+ }
+>()("@test/PromptLLM") {}
+
+function stream(...items: LLM.Event[]) {
+ return Stream.make(...items)
+}
+
+function usage(input = 1, output = 1, total = input + output) {
+ return {
+ inputTokens: input,
+ outputTokens: output,
+ totalTokens: total,
+ inputTokenDetails: {
+ noCacheTokens: undefined,
+ cacheReadTokens: undefined,
+ cacheWriteTokens: undefined,
+ },
+ outputTokenDetails: {
+ textTokens: undefined,
+ reasoningTokens: undefined,
+ },
+ }
+}
+
+function start(): LLM.Event {
+ return { type: "start" }
+}
+
+function textStart(id = "t"): LLM.Event {
+ return { type: "text-start", id }
+}
+
+function textDelta(id: string, text: string): LLM.Event {
+ return { type: "text-delta", id, text }
+}
+
+function textEnd(id = "t"): LLM.Event {
+ return { type: "text-end", id }
+}
+
+function finishStep(): LLM.Event {
+ return {
+ type: "finish-step",
+ finishReason: "stop",
+ rawFinishReason: "stop",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(),
+ }
+}
+
+function finish(): LLM.Event {
+ return { type: "finish", finishReason: "stop", rawFinishReason: "stop", totalUsage: usage() }
+}
+
+function finishToolCallsStep(): LLM.Event {
+ return {
+ type: "finish-step",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(),
+ }
+}
+
+function finishToolCalls(): LLM.Event {
+ return { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() }
+}
+
+function replyStop(text: string, id = "t") {
+ return [start(), textStart(id), textDelta(id, text), textEnd(id), finishStep(), finish()] as const
+}
+
+function replyToolCalls(text: string, id = "t") {
+ return [start(), textStart(id), textDelta(id, text), textEnd(id), finishToolCallsStep(), finishToolCalls()] as const
+}
+
+function toolInputStart(id: string, toolName: string): LLM.Event {
+ return { type: "tool-input-start", id, toolName }
+}
+
+function toolCall(toolCallId: string, toolName: string, input: unknown): LLM.Event {
+ return { type: "tool-call", toolCallId, toolName, input }
+}
+
+function hang(_input: LLM.StreamInput, ...items: LLM.Event[]) {
+ return stream(...items).pipe(Stream.concat(Stream.fromEffect(Effect.never)))
+}
+
+function defer<T>() {
+ let resolve!: (value: T | PromiseLike<T>) => void
+ const promise = new Promise<T>((done) => {
+ resolve = done
+ })
+ return { promise, resolve }
+}
+
+function waitMs(ms: number) {
+ return Effect.promise(() => new Promise<void>((done) => setTimeout(done, ms)))
+}
+
+function withSh<A, E, R>(fx: () => Effect.Effect<A, E, R>) {
+ return Effect.acquireUseRelease(
+ Effect.sync(() => {
+ const prev = process.env.SHELL
+ process.env.SHELL = "/bin/sh"
+ Shell.preferred.reset()
+ return prev
+ }),
+ () => fx(),
+ (prev) =>
+ Effect.sync(() => {
+ if (prev === undefined) delete process.env.SHELL
+ else process.env.SHELL = prev
+ Shell.preferred.reset()
+ }),
+ )
+}
+
+function toolPart(parts: MessageV2.Part[]) {
+ return parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
+}
+
+type CompletedToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateCompleted }
+type ErrorToolPart = MessageV2.ToolPart & { state: MessageV2.ToolStateError }
+
+function completedTool(parts: MessageV2.Part[]) {
+ const part = toolPart(parts)
+ expect(part?.state.status).toBe("completed")
+ return part?.state.status === "completed" ? (part as CompletedToolPart) : undefined
+}
+
+function errorTool(parts: MessageV2.Part[]) {
+ const part = toolPart(parts)
+ expect(part?.state.status).toBe("error")
+ return part?.state.status === "error" ? (part as ErrorToolPart) : undefined
+}
+
+const llm = Layer.unwrap(
+ Effect.gen(function* () {
+ const queue: Script[] = []
+ const inputs: LLM.StreamInput[] = []
+ let calls = 0
+
+ const push = Effect.fn("TestLLM.push")((item: Script) => {
+ queue.push(item)
+ return Effect.void
+ })
+
+ const reply = Effect.fn("TestLLM.reply")((...items: LLM.Event[]) => push(stream(...items)))
+ return Layer.mergeAll(
+ Layer.succeed(
+ LLM.Service,
+ LLM.Service.of({
+ stream: (input) => {
+ calls += 1
+ inputs.push(input)
+ const item = queue.shift() ?? Stream.empty
+ return typeof item === "function" ? item(input) : item
+ },
+ }),
+ ),
+ Layer.succeed(
+ TestLLM,
+ TestLLM.of({
+ push,
+ reply,
+ calls: Effect.sync(() => calls),
+ inputs: Effect.sync(() => [...inputs]),
+ }),
+ ),
+ )
+ }),
+)
+
+const mcp = Layer.succeed(
+ MCP.Service,
+ MCP.Service.of({
+ status: () => Effect.succeed({}),
+ clients: () => Effect.succeed({}),
+ tools: () => Effect.succeed({}),
+ prompts: () => Effect.succeed({}),
+ resources: () => Effect.succeed({}),
+ add: () => Effect.succeed({ status: { status: "disabled" as const } }),
+ connect: () => Effect.void,
+ disconnect: () => Effect.void,
+ getPrompt: () => Effect.succeed(undefined),
+ readResource: () => Effect.succeed(undefined),
+ startAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
+ authenticate: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
+ finishAuth: () => Effect.die("unexpected MCP auth in prompt-effect tests"),
+ removeAuth: () => Effect.void,
+ supportsOAuth: () => Effect.succeed(false),
+ hasStoredTokens: () => Effect.succeed(false),
+ getAuthStatus: () => Effect.succeed("not_authenticated" as const),
+ }),
+)
+
+const lsp = Layer.succeed(
+ LSP.Service,
+ LSP.Service.of({
+ init: () => Effect.void,
+ status: () => Effect.succeed([]),
+ hasClients: () => Effect.succeed(false),
+ touchFile: () => Effect.void,
+ diagnostics: () => Effect.succeed({}),
+ hover: () => Effect.succeed(undefined),
+ definition: () => Effect.succeed([]),
+ references: () => Effect.succeed([]),
+ implementation: () => Effect.succeed([]),
+ documentSymbol: () => Effect.succeed([]),
+ workspaceSymbol: () => Effect.succeed([]),
+ prepareCallHierarchy: () => Effect.succeed([]),
+ incomingCalls: () => Effect.succeed([]),
+ outgoingCalls: () => Effect.succeed([]),
+ }),
+)
+
+const filetime = Layer.succeed(
+ FileTime.Service,
+ FileTime.Service.of({
+ read: () => Effect.void,
+ get: () => Effect.succeed(undefined),
+ assert: () => Effect.void,
+ withLock: (_filepath, fn) => Effect.promise(fn),
+ }),
+)
+
+const status = SessionStatus.layer.pipe(Layer.provideMerge(Bus.layer))
+const infra = Layer.mergeAll(NodeFileSystem.layer, CrossSpawnSpawner.defaultLayer)
+const deps = Layer.mergeAll(
+ Session.defaultLayer,
+ Snapshot.defaultLayer,
+ AgentSvc.defaultLayer,
+ Command.defaultLayer,
+ Permission.layer,
+ Plugin.defaultLayer,
+ Config.defaultLayer,
+ filetime,
+ lsp,
+ mcp,
+ AppFileSystem.defaultLayer,
+ status,
+ llm,
+).pipe(Layer.provideMerge(infra))
+const registry = ToolRegistry.layer.pipe(Layer.provideMerge(deps))
+const trunc = Truncate.layer.pipe(Layer.provideMerge(deps))
+const proc = SessionProcessor.layer.pipe(Layer.provideMerge(deps))
+const compact = SessionCompaction.layer.pipe(Layer.provideMerge(proc), Layer.provideMerge(deps))
+const env = SessionPrompt.layer.pipe(
+ Layer.provideMerge(compact),
+ Layer.provideMerge(proc),
+ Layer.provideMerge(registry),
+ Layer.provideMerge(trunc),
+ Layer.provideMerge(deps),
+)
+
+const it = testEffect(env)
+const unix = process.platform !== "win32" ? it.effect : it.effect.skip
+
+// Config that registers a custom "test" provider with a "test-model" model
+// so Provider.getModel("test", "test-model") succeeds inside the loop.
+const cfg = {
+ provider: {
+ test: {
+ name: "Test",
+ id: "test",
+ env: [],
+ npm: "@ai-sdk/openai-compatible",
+ models: {
+ "test-model": {
+ id: "test-model",
+ name: "Test Model",
+ attachment: false,
+ reasoning: false,
+ temperature: false,
+ tool_call: true,
+ release_date: "2025-01-01",
+ limit: { context: 100000, output: 10000 },
+ cost: { input: 0, output: 0 },
+ options: {},
+ },
+ },
+ options: {
+ apiKey: "test-key",
+ baseURL: "http://localhost:1/v1",
+ },
+ },
+ },
+}
+
+const user = Effect.fn("test.user")(function* (sessionID: SessionID, text: string) {
+ const session = yield* Session.Service
+ const msg = yield* session.updateMessage({
+ id: MessageID.ascending(),
+ role: "user",
+ sessionID,
+ agent: "build",
+ model: ref,
+ time: { created: Date.now() },
+ })
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID: msg.id,
+ sessionID,
+ type: "text",
+ text,
+ })
+ return msg
+})
+
+const seed = Effect.fn("test.seed")(function* (sessionID: SessionID, opts?: { finish?: string }) {
+ const session = yield* Session.Service
+ const msg = yield* user(sessionID, "hello")
+ const assistant: MessageV2.Assistant = {
+ id: MessageID.ascending(),
+ role: "assistant",
+ parentID: msg.id,
+ sessionID,
+ mode: "build",
+ agent: "build",
+ cost: 0,
+ path: { cwd: "/tmp", root: "/tmp" },
+ tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
+ modelID: ref.modelID,
+ providerID: ref.providerID,
+ time: { created: Date.now() },
+ ...(opts?.finish ? { finish: opts.finish } : {}),
+ }
+ yield* session.updateMessage(assistant)
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID: assistant.id,
+ sessionID,
+ type: "text",
+ text: "hi there",
+ })
+ return { user: msg, assistant }
+})
+
+const addSubtask = (sessionID: SessionID, messageID: MessageID, model = ref) =>
+ Effect.gen(function* () {
+ const session = yield* Session.Service
+ yield* session.updatePart({
+ id: PartID.ascending(),
+ messageID,
+ sessionID,
+ type: "subtask",
+ prompt: "look into the cache key path",
+ description: "inspect bug",
+ agent: "general",
+ model,
+ })
+ })
+
+const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
+ const test = yield* TestLLM
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create(input ?? { title: "Pinned" })
+ return { test, prompt, sessions, chat }
+})
+
+// Loop semantics
+
+it.effect("loop exits immediately when last assistant has stop finish", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* seed(chat.id, { finish: "stop" })
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") expect(result.info.finish).toBe("stop")
+ expect(yield* test.calls).toBe(0)
+ }),
+ { git: true },
+ ),
+)
+
+it.effect("loop calls LLM and returns assistant message", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyStop("world"))
+ yield* user(chat.id, "hello")
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ const parts = result.parts.filter((p) => p.type === "text")
+ expect(parts.some((p) => p.type === "text" && p.text === "world")).toBe(true)
+ expect(yield* test.calls).toBe(1)
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+it.effect("loop continues when finish is tool-calls", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyToolCalls("first"))
+ yield* test.reply(...replyStop("second"))
+ yield* user(chat.id, "hello")
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(yield* test.calls).toBe(2)
+ expect(result.info.role).toBe("assistant")
+ if (result.info.role === "assistant") {
+ expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
+ expect(result.info.finish).toBe("stop")
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+it.effect("failed subtask preserves metadata on error tool state", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot({ title: "Pinned" })
+ yield* test.reply(
+ start(),
+ toolInputStart("task-1", "task"),
+ toolCall("task-1", "task", {
+ description: "inspect bug",
+ prompt: "look into the cache key path",
+ subagent_type: "general",
+ }),
+ {
+ type: "finish-step",
+ finishReason: "tool-calls",
+ rawFinishReason: "tool_calls",
+ response: { id: "res", modelId: "test-model", timestamp: new Date() },
+ providerMetadata: undefined,
+ usage: usage(),
+ },
+ { type: "finish", finishReason: "tool-calls", rawFinishReason: "tool_calls", totalUsage: usage() },
+ )
+ yield* test.reply(...replyStop("done"))
+ const msg = yield* user(chat.id, "hello")
+ yield* addSubtask(chat.id, msg.id)
+
+ const result = yield* prompt.loop({ sessionID: chat.id })
+ expect(result.info.role).toBe("assistant")
+ expect(yield* test.calls).toBe(2)
+
+ const msgs = yield* Effect.promise(() => MessageV2.filterCompacted(MessageV2.stream(chat.id)))
+ const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
+ expect(taskMsg?.info.role).toBe("assistant")
+ if (!taskMsg || taskMsg.info.role !== "assistant") return
+
+ const tool = errorTool(taskMsg.parts)
+ if (!tool) return
+
+ expect(tool.state.error).toContain("Tool execution failed")
+ expect(tool.state.metadata).toBeDefined()
+ expect(tool.state.metadata?.sessionId).toBeDefined()
+ expect(tool.state.metadata?.model).toEqual({
+ providerID: ProviderID.make("test"),
+ modelID: ModelID.make("missing-model"),
+ })
+ }),
+ {
+ git: true,
+ config: {
+ ...cfg,
+ agent: {
+ general: {
+ model: "test/missing-model",
+ },
+ },
+ },
+ },
+ ),
+)
+
+it.effect("loop sets status to busy then idle", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const test = yield* TestLLM
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const bus = yield* Bus.Service
+
+ yield* test.reply(start(), textStart(), textDelta("t", "ok"), textEnd(), finishStep(), finish())
+
+ const chat = yield* sessions.create({})
+ yield* user(chat.id, "hi")
+
+ const types: string[] = []
+ const idle = defer<void>()
+ const off = yield* bus.subscribeCallback(SessionStatus.Event.Status, (evt) => {
+ if (evt.properties.sessionID !== chat.id) return
+ types.push(evt.properties.status.type)
+ if (evt.properties.status.type === "idle") idle.resolve()
+ })
+
+ yield* prompt.loop({ sessionID: chat.id })
+ yield* Effect.promise(() => idle.promise)
+ off()
+
+ expect(types).toContain("busy")
+ expect(types[types.length - 1]).toBe("idle")
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+// Cancel semantics
+
+it.effect(
+ "cancel interrupts loop and resolves with an assistant message",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* seed(chat.id)
+
+ // Make LLM hang so the loop blocks
+ yield* test.push((input) => hang(input, start()))
+
+ // Seed a new user message so the loop enters the LLM path
+ yield* user(chat.id, "more")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ // Give the loop time to start
+ yield* waitMs(200)
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect(
+ "cancel records MessageAbortedError on interrupted process",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const { test, prompt, chat } = yield* boot()
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+ yield* user(chat.id, "hello")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ const info = exit.value.info
+ if (info.role === "assistant") {
+ expect(info.error?.name).toBe("MessageAbortedError")
+ }
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect(
+ "cancel with queued callers resolves all cleanly",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const { test, prompt, chat } = yield* boot()
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+ yield* user(chat.id, "hello")
+
+ const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+ // Queue a second caller
+ const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(exitA)).toBe(true)
+ expect(Exit.isSuccess(exitB)).toBe(true)
+ if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
+ expect(exitA.value.info.id).toBe(exitB.value.info.id)
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+// Queue semantics
+
+it.effect("concurrent loop callers get same result", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+ yield* seed(chat.id, { finish: "stop" })
+
+ const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
+ concurrency: "unbounded",
+ })
+
+ expect(a.info.id).toBe(b.info.id)
+ expect(a.info.role).toBe("assistant")
+ yield* prompt.assertNotBusy(chat.id)
+ }),
+ { git: true },
+ ),
+)
+
+it.effect("concurrent loop callers all receive same error result", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+
+ // Push a stream that fails — the loop records the error on the assistant message
+ yield* test.push(Stream.fail(new Error("boom")))
+ yield* user(chat.id, "hello")
+
+ const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
+ concurrency: "unbounded",
+ })
+
+ // Both callers get the same assistant with an error recorded
+ expect(a.info.id).toBe(b.info.id)
+ expect(a.info.role).toBe("assistant")
+ if (a.info.role === "assistant") {
+ expect(a.info.error).toBeDefined()
+ }
+ if (b.info.role === "assistant") {
+ expect(b.info.error).toBeDefined()
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+it.effect(
+ "prompt submitted during an active run is included in the next LLM input",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const gate = defer<void>()
+ const { test, prompt, sessions, chat } = yield* boot()
+
+ yield* test.push((_input) =>
+ stream(start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ Stream.concat(
+ Stream.fromEffect(Effect.promise(() => gate.promise)).pipe(
+ Stream.flatMap(() =>
+ stream(textStart("a"), textDelta("a", "first"), textEnd("a"), finishStep(), finish()),
+ ),
+ ),
+ ),
+ ),
+ )
+
+ const a = yield* prompt
+ .prompt({
+ sessionID: chat.id,
+ agent: "build",
+ model: ref,
+ parts: [{ type: "text", text: "first" }],
+ })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(() => ready.promise)
+
+ const id = MessageID.ascending()
+ const b = yield* prompt
+ .prompt({
+ sessionID: chat.id,
+ messageID: id,
+ agent: "build",
+ model: ref,
+ parts: [{ type: "text", text: "second" }],
+ })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(async () => {
+ const end = Date.now() + 5000
+ while (Date.now() < end) {
+ const msgs = await Effect.runPromise(sessions.messages({ sessionID: chat.id }))
+ if (msgs.some((msg) => msg.info.role === "user" && msg.info.id === id)) return
+ await new Promise((done) => setTimeout(done, 20))
+ }
+ throw new Error("timed out waiting for second prompt to save")
+ })
+
+ yield* test.reply(...replyStop("second"))
+ gate.resolve()
+
+ const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+ expect(Exit.isSuccess(ea)).toBe(true)
+ expect(Exit.isSuccess(eb)).toBe(true)
+ expect(yield* test.calls).toBe(2)
+
+ const msgs = yield* sessions.messages({ sessionID: chat.id })
+ const assistants = msgs.filter((msg) => msg.info.role === "assistant")
+ expect(assistants).toHaveLength(2)
+ const last = assistants.at(-1)
+ if (!last || last.info.role !== "assistant") throw new Error("expected second assistant")
+ expect(last.info.parentID).toBe(id)
+ expect(last.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
+
+ const inputs = yield* test.inputs
+ expect(inputs).toHaveLength(2)
+ expect(JSON.stringify(inputs.at(-1)?.messages)).toContain("second")
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect(
+ "assertNotBusy throws BusyError when loop running",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const test = yield* TestLLM
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+
+ const chat = yield* sessions.create({})
+ yield* user(chat.id, "hi")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+
+ const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+it.effect("assertNotBusy succeeds when idle", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+
+ const chat = yield* sessions.create({})
+ const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ }),
+ { git: true },
+ ),
+)
+
+// Shell semantics
+
+it.effect(
+ "shell rejects with BusyError when loop running",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const ready = defer<void>()
+ const { test, prompt, chat } = yield* boot()
+
+ yield* test.push((input) =>
+ hang(input, start()).pipe(
+ Stream.tap((event) => (event.type === "start" ? Effect.sync(() => ready.resolve()) : Effect.void)),
+ ),
+ )
+ yield* user(chat.id, "hi")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* Effect.promise(() => ready.promise)
+
+ const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix("shell captures stdout and stderr in completed tool output", () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+ const result = yield* prompt.shell({
+ sessionID: chat.id,
+ agent: "build",
+ command: "printf out && printf err >&2",
+ })
+
+ expect(result.info.role).toBe("assistant")
+ const tool = completedTool(result.parts)
+ if (!tool) return
+
+ expect(tool.state.output).toContain("out")
+ expect(tool.state.output).toContain("err")
+ expect(tool.state.metadata.output).toContain("out")
+ expect(tool.state.metadata.output).toContain("err")
+ yield* prompt.assertNotBusy(chat.id)
+ }),
+ { git: true, config: cfg },
+ ),
+)
+
+unix(
+ "shell updates running metadata before process exit",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const fiber = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "printf first && sleep 0.2 && printf second" })
+ .pipe(Effect.forkChild)
+
+ yield* Effect.promise(async () => {
+ const start = Date.now()
+ while (Date.now() - start < 5000) {
+ const msgs = await MessageV2.filterCompacted(MessageV2.stream(chat.id))
+ const taskMsg = msgs.find((item) => item.info.role === "assistant")
+ const tool = taskMsg ? toolPart(taskMsg.parts) : undefined
+ if (tool?.state.status === "running" && tool.state.metadata?.output.includes("first")) return
+ await new Promise((done) => setTimeout(done, 20))
+ }
+ throw new Error("timed out waiting for running shell metadata")
+ })
+
+ const exit = yield* Fiber.await(fiber)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)
+
+unix(
+ "loop waits while shell runs and starts after shell exits",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyStop("after-shell"))
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ expect(yield* test.calls).toBe(0)
+
+ yield* Fiber.await(sh)
+ const exit = yield* Fiber.await(run)
+
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ expect(exit.value.parts.some((part) => part.type === "text" && part.text === "after-shell")).toBe(true)
+ }
+ expect(yield* test.calls).toBe(1)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix(
+ "shell completion resumes queued loop callers",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { test, prompt, chat } = yield* boot()
+ yield* test.reply(...replyStop("done"))
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 0.2" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ expect(yield* test.calls).toBe(0)
+
+ yield* Fiber.await(sh)
+ const [ea, eb] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
+
+ expect(Exit.isSuccess(ea)).toBe(true)
+ expect(Exit.isSuccess(eb)).toBe(true)
+ if (Exit.isSuccess(ea) && Exit.isSuccess(eb)) {
+ expect(ea.value.info.id).toBe(eb.value.info.id)
+ expect(ea.value.info.role).toBe("assistant")
+ }
+ expect(yield* test.calls).toBe(1)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix(
+ "cancel interrupts shell and resolves cleanly",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const status = yield* SessionStatus.Service
+ expect((yield* status.get(chat.id)).type).toBe("idle")
+ const busy = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
+ expect(Exit.isSuccess(busy)).toBe(true)
+
+ const exit = yield* Fiber.await(sh)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ const tool = completedTool(exit.value.parts)
+ if (tool) {
+ expect(tool.state.output).toContain("User aborted the command")
+ }
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)
+
+unix(
+ "cancel persists aborted shell result when shell ignores TERM",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "trap '' TERM; sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(sh)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isSuccess(exit)) {
+ expect(exit.value.info.role).toBe("assistant")
+ const tool = completedTool(exit.value.parts)
+ if (tool) {
+ expect(tool.state.output).toContain("User aborted the command")
+ }
+ }
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)
+
+unix(
+ "cancel interrupts loop queued behind shell",
+ () =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const sh = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(run)
+ expect(Exit.isSuccess(exit)).toBe(true)
+
+ yield* Fiber.await(sh)
+ }),
+ { git: true, config: cfg },
+ ),
+ 30_000,
+)
+
+unix(
+ "shell rejects when another shell is already running",
+ () =>
+ withSh(() =>
+ provideTmpdirInstance(
+ (dir) =>
+ Effect.gen(function* () {
+ const { prompt, chat } = yield* boot()
+
+ const a = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
+ .pipe(Effect.forkChild)
+ yield* waitMs(50)
+
+ const exit = yield* prompt
+ .shell({ sessionID: chat.id, agent: "build", command: "echo hi" })
+ .pipe(Effect.exit)
+ expect(Exit.isFailure(exit)).toBe(true)
+ if (Exit.isFailure(exit)) {
+ expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
+ }
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(a)
+ }),
+ { git: true, config: cfg },
+ ),
+ ),
+ 30_000,
+)