summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKit Langton <[email protected]>2026-04-09 15:20:28 -0400
committerGitHub <[email protected]>2026-04-09 15:20:28 -0400
commit3199383eef4cc2ac4ca086f9485b071061dcff70 (patch)
treed781bfa22100861de0bf45204c149d5968498660
parent9f54115c5dbe98b7a0020875cb2f6035626af19d (diff)
downloadopencode-3199383eef4cc2ac4ca086f9485b071061dcff70.tar.gz
opencode-3199383eef4cc2ac4ca086f9485b071061dcff70.zip
fix: finalize interrupted bash via tool result path (#21724)
-rw-r--r--packages/opencode/src/session/processor.ts178
-rw-r--r--packages/opencode/src/session/prompt.ts20
-rw-r--r--packages/opencode/test/session/compaction.test.ts13
-rw-r--r--packages/opencode/test/session/prompt-effect.test.ts138
4 files changed, 282 insertions, 67 deletions
diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts
index 225961aef..2e4d34bfc 100644
--- a/packages/opencode/src/session/processor.ts
+++ b/packages/opencode/src/session/processor.ts
@@ -1,4 +1,4 @@
-import { Cause, Effect, Layer, ServiceMap } from "effect"
+import { Cause, Deferred, Effect, Layer, ServiceMap } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
@@ -18,6 +18,7 @@ import { SessionStatus } from "./status"
import { SessionSummary } from "./summary"
import type { Provider } from "@/provider/provider"
import { Question } from "@/question"
+import { errorMessage } from "@/util/error"
import { isRecord } from "@/util/record"
export namespace SessionProcessor {
@@ -30,7 +31,19 @@ export namespace SessionProcessor {
export interface Handle {
readonly message: MessageV2.Assistant
- readonly partFromToolCall: (toolCallID: string) => MessageV2.ToolPart | undefined
+ readonly updateToolCall: (
+ toolCallID: string,
+ update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
+ ) => Effect.Effect<MessageV2.ToolPart | undefined>
+ readonly completeToolCall: (
+ toolCallID: string,
+ output: {
+ title: string
+ metadata: Record<string, any>
+ output: string
+ attachments?: MessageV2.FilePart[]
+ },
+ ) => Effect.Effect<void>
readonly process: (streamInput: LLM.StreamInput) => Effect.Effect<Result>
}
@@ -44,8 +57,15 @@ export namespace SessionProcessor {
readonly create: (input: Input) => Effect.Effect<Handle>
}
+ type ToolCall = {
+ partID: MessageV2.ToolPart["id"]
+ messageID: MessageV2.ToolPart["messageID"]
+ sessionID: MessageV2.ToolPart["sessionID"]
+ done: Deferred.Deferred<void>
+ }
+
interface ProcessorContext extends Input {
- toolcalls: Record<string, MessageV2.ToolPart>
+ toolcalls: Record<string, ToolCall>
shouldBreak: boolean
snapshot: string | undefined
blocked: boolean
@@ -108,6 +128,88 @@ export namespace SessionProcessor {
aborted,
})
+ const settleToolCall = Effect.fn("SessionProcessor.settleToolCall")(function* (toolCallID: string) {
+ const done = ctx.toolcalls[toolCallID]?.done
+ delete ctx.toolcalls[toolCallID]
+ if (done) yield* Deferred.succeed(done, undefined).pipe(Effect.ignore)
+ })
+
+ const readToolCall = Effect.fn("SessionProcessor.readToolCall")(function* (toolCallID: string) {
+ const call = ctx.toolcalls[toolCallID]
+ if (!call) return
+ const part = yield* session.getPart({
+ partID: call.partID,
+ messageID: call.messageID,
+ sessionID: call.sessionID,
+ })
+ if (!part || part.type !== "tool") {
+ delete ctx.toolcalls[toolCallID]
+ return
+ }
+ return { call, part }
+ })
+
+ const updateToolCall = Effect.fn("SessionProcessor.updateToolCall")(function* (
+ toolCallID: string,
+ update: (part: MessageV2.ToolPart) => MessageV2.ToolPart,
+ ) {
+ const match = yield* readToolCall(toolCallID)
+ if (!match) return
+ const part = yield* session.updatePart(update(match.part))
+ ctx.toolcalls[toolCallID] = {
+ ...match.call,
+ partID: part.id,
+ messageID: part.messageID,
+ sessionID: part.sessionID,
+ }
+ return part
+ })
+
+ const completeToolCall = Effect.fn("SessionProcessor.completeToolCall")(function* (
+ toolCallID: string,
+ output: {
+ title: string
+ metadata: Record<string, any>
+ output: string
+ attachments?: MessageV2.FilePart[]
+ },
+ ) {
+ const match = yield* readToolCall(toolCallID)
+ if (!match || match.part.state.status !== "running") return
+ yield* session.updatePart({
+ ...match.part,
+ state: {
+ status: "completed",
+ input: match.part.state.input,
+ output: output.output,
+ metadata: output.metadata,
+ title: output.title,
+ time: { start: match.part.state.time.start, end: Date.now() },
+ attachments: output.attachments,
+ },
+ })
+ yield* settleToolCall(toolCallID)
+ })
+
+ const failToolCall = Effect.fn("SessionProcessor.failToolCall")(function* (toolCallID: string, error: unknown) {
+ const match = yield* readToolCall(toolCallID)
+ if (!match || match.part.state.status !== "running") return false
+ yield* session.updatePart({
+ ...match.part,
+ state: {
+ status: "error",
+ input: match.part.state.input,
+ error: errorMessage(error),
+ time: { start: match.part.state.time.start, end: Date.now() },
+ },
+ })
+ if (error instanceof Permission.RejectedError || error instanceof Question.RejectedError) {
+ ctx.blocked = ctx.shouldBreak
+ }
+ yield* settleToolCall(toolCallID)
+ return true
+ })
+
const handleEvent = Effect.fn("SessionProcessor.handleEvent")(function* (value: StreamEvent) {
switch (value.type) {
case "start":
@@ -154,8 +256,8 @@ export namespace SessionProcessor {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
- ctx.toolcalls[value.id] = yield* session.updatePart({
- id: ctx.toolcalls[value.id]?.id ?? PartID.ascending(),
+ const part = yield* session.updatePart({
+ id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "tool",
@@ -164,6 +266,12 @@ export namespace SessionProcessor {
state: { status: "pending", input: {}, raw: "" },
metadata: value.providerExecuted ? { providerExecuted: true } : undefined,
} satisfies MessageV2.ToolPart)
+ ctx.toolcalls[value.id] = {
+ done: yield* Deferred.make<void>(),
+ partID: part.id,
+ messageID: part.messageID,
+ sessionID: part.sessionID,
+ }
return
case "tool-input-delta":
@@ -176,14 +284,7 @@ export namespace SessionProcessor {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
- const pointer = ctx.toolcalls[value.toolCallId]
- const match = yield* session.getPart({
- partID: pointer.id,
- messageID: pointer.messageID,
- sessionID: pointer.sessionID,
- })
- if (!match || match.type !== "tool") return
- ctx.toolcalls[value.toolCallId] = yield* session.updatePart({
+ yield* updateToolCall(value.toolCallId, (match) => ({
...match,
tool: value.toolName,
state: {
@@ -195,7 +296,7 @@ export namespace SessionProcessor {
metadata: match.metadata?.providerExecuted
? { ...value.providerMetadata, providerExecuted: true }
: value.providerMetadata,
- } satisfies MessageV2.ToolPart)
+ }))
const parts = MessageV2.parts(ctx.assistantMessage.id)
const recentParts = parts.slice(-DOOM_LOOP_THRESHOLD)
@@ -226,41 +327,12 @@ export namespace SessionProcessor {
}
case "tool-result": {
- const match = ctx.toolcalls[value.toolCallId]
- if (!match || match.state.status !== "running") return
- yield* session.updatePart({
- ...match,
- state: {
- status: "completed",
- input: value.input ?? match.state.input,
- output: value.output.output,
- metadata: value.output.metadata,
- title: value.output.title,
- time: { start: match.state.time.start, end: Date.now() },
- attachments: value.output.attachments,
- },
- })
- delete ctx.toolcalls[value.toolCallId]
+ yield* completeToolCall(value.toolCallId, value.output)
return
}
case "tool-error": {
- const match = ctx.toolcalls[value.toolCallId]
- if (!match || match.state.status !== "running") return
-
- yield* session.updatePart({
- ...match,
- state: {
- status: "error",
- input: value.input ?? match.state.input,
- error: value.error instanceof Error ? value.error.message : String(value.error),
- time: { start: match.state.time.start, end: Date.now() },
- },
- })
- if (value.error instanceof Permission.RejectedError || value.error instanceof Question.RejectedError) {
- ctx.blocked = ctx.shouldBreak
- }
- delete ctx.toolcalls[value.toolCallId]
+ yield* failToolCall(value.toolCallId, value.error)
return
}
@@ -413,7 +485,16 @@ export namespace SessionProcessor {
}
ctx.reasoningMap = {}
- for (const part of Object.values(ctx.toolcalls)) {
+ yield* Effect.forEach(
+ Object.values(ctx.toolcalls),
+ (call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore),
+ { concurrency: "unbounded" },
+ )
+
+ for (const toolCallID of Object.keys(ctx.toolcalls)) {
+ const match = yield* readToolCall(toolCallID)
+ if (!match) continue
+ const part = match.part
const end = Date.now()
const metadata = "metadata" in part.state && isRecord(part.state.metadata) ? part.state.metadata : {}
yield* session.updatePart({
@@ -503,9 +584,8 @@ export namespace SessionProcessor {
get message() {
return ctx.assistantMessage
},
- partFromToolCall(toolCallID: string) {
- return ctx.toolcalls[toolCallID]
- },
+ updateToolCall,
+ completeToolCall,
process,
} satisfies Handle
})
diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts
index 19f0850ff..088a367ca 100644
--- a/packages/opencode/src/session/prompt.ts
+++ b/packages/opencode/src/session/prompt.ts
@@ -388,7 +388,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
model: Provider.Model
session: Session.Info
tools?: Record<string, boolean>
- processor: Pick<SessionProcessor.Handle, "message" | "partFromToolCall">
+ processor: Pick<SessionProcessor.Handle, "message" | "updateToolCall" | "completeToolCall">
bypassAgentCheck: boolean
messages: MessageV2.WithParts[]
}) {
@@ -405,10 +405,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
messages: input.messages,
metadata: (val) =>
Effect.runPromise(
- Effect.gen(function* () {
- const match = input.processor.partFromToolCall(options.toolCallId)
- if (!match || !["running", "pending"].includes(match.state.status)) return
- yield* sessions.updatePart({
+ input.processor.updateToolCall(options.toolCallId, (match) => {
+ if (!["running", "pending"].includes(match.state.status)) return match
+ return {
...match,
state: {
title: val.title,
@@ -417,7 +416,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
input: args,
time: { start: Date.now() },
},
- })
+ }
}),
),
ask: (req) =>
@@ -465,6 +464,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
{ tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args },
output,
)
+ if (options.abortSignal?.aborted) {
+ yield* input.processor.completeToolCall(options.toolCallId, output)
+ }
return output
}),
)
@@ -529,7 +531,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
...(truncated.truncated && { outputPath: truncated.outputPath }),
}
- return {
+ const output = {
title: "",
metadata,
output: truncated.content,
@@ -541,6 +543,10 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})),
content: result.content,
}
+ if (opts.abortSignal?.aborted) {
+ yield* input.processor.completeToolCall(opts.toolCallId, output)
+ }
+ return output
}),
)
tools[key] = item
diff --git a/packages/opencode/test/session/compaction.test.ts b/packages/opencode/test/session/compaction.test.ts
index c37371d9f..76a83c34d 100644
--- a/packages/opencode/test/session/compaction.test.ts
+++ b/packages/opencode/test/session/compaction.test.ts
@@ -139,17 +139,8 @@ function fake(
get message() {
return msg
},
- partFromToolCall() {
- return {
- id: PartID.ascending(),
- messageID: msg.id,
- sessionID: msg.sessionID,
- type: "tool",
- callID: "fake",
- tool: "fake",
- state: { status: "pending", input: {}, raw: "" },
- }
- },
+ updateToolCall: Effect.fn("TestSessionProcessor.updateToolCall")(() => Effect.succeed(undefined)),
+ completeToolCall: Effect.fn("TestSessionProcessor.completeToolCall")(() => Effect.void),
process: Effect.fn("TestSessionProcessor.process")(() => Effect.succeed(result)),
} satisfies SessionProcessorModule.SessionProcessor.Handle
}
diff --git a/packages/opencode/test/session/prompt-effect.test.ts b/packages/opencode/test/session/prompt-effect.test.ts
index 38d7ed9f5..e4c46337c 100644
--- a/packages/opencode/test/session/prompt-effect.test.ts
+++ b/packages/opencode/test/session/prompt-effect.test.ts
@@ -539,6 +539,93 @@ it.live("failed subtask preserves metadata on error tool state", () =>
)
it.live(
+ "running subtask preserves metadata after tool-call transition",
+ () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({ title: "Pinned" })
+ yield* llm.hang
+ const msg = yield* user(chat.id, "hello")
+ yield* addSubtask(chat.id, msg.id)
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+
+ const tool = yield* Effect.promise(async () => {
+ const end = Date.now() + 5_000
+ while (Date.now() < end) {
+ const msgs = await Effect.runPromise(MessageV2.filterCompactedEffect(chat.id))
+ const taskMsg = msgs.find((item) => item.info.role === "assistant" && item.info.agent === "general")
+ const tool = taskMsg?.parts.find((part): part is MessageV2.ToolPart => part.type === "tool")
+ if (tool?.state.status === "running" && tool.state.metadata?.sessionId) return tool
+ await new Promise((done) => setTimeout(done, 20))
+ }
+ throw new Error("timed out waiting for running subtask metadata")
+ })
+
+ if (tool.state.status !== "running") return
+ expect(typeof tool.state.metadata?.sessionId).toBe("string")
+ expect(tool.state.title).toBeDefined()
+ expect(tool.state.metadata?.model).toBeDefined()
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: providerCfg },
+ ),
+ 5_000,
+)
+
+it.live(
+ "running task tool preserves metadata after tool-call transition",
+ () =>
+ provideTmpdirServer(
+ Effect.fnUntraced(function* ({ llm }) {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({
+ title: "Pinned",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ })
+ yield* llm.tool("task", {
+ description: "inspect bug",
+ prompt: "look into the cache key path",
+ subagent_type: "general",
+ })
+ yield* llm.hang
+ yield* user(chat.id, "hello")
+
+ const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+
+ const tool = yield* Effect.promise(async () => {
+ const end = Date.now() + 5_000
+ while (Date.now() < end) {
+ const msgs = await Effect.runPromise(MessageV2.filterCompactedEffect(chat.id))
+ const assistant = msgs.findLast((item) => item.info.role === "assistant" && item.info.agent === "build")
+ const tool = assistant?.parts.find(
+ (part): part is MessageV2.ToolPart => part.type === "tool" && part.tool === "task",
+ )
+ if (tool?.state.status === "running" && tool.state.metadata?.sessionId) return tool
+ await new Promise((done) => setTimeout(done, 20))
+ }
+ throw new Error("timed out waiting for running task metadata")
+ })
+
+ if (tool.state.status !== "running") return
+ expect(typeof tool.state.metadata?.sessionId).toBe("string")
+ expect(tool.state.title).toBe("inspect bug")
+ expect(tool.state.metadata?.model).toBeDefined()
+
+ yield* prompt.cancel(chat.id)
+ yield* Fiber.await(fiber)
+ }),
+ { git: true, config: providerCfg },
+ ),
+ 10_000,
+)
+
+it.live(
"loop sets status to busy then idle",
() =>
provideTmpdirServer(
@@ -1174,6 +1261,57 @@ unix(
)
unix(
+ "cancel finalizes interrupted bash tool output through normal truncation",
+ () =>
+ provideTmpdirServer(
+ ({ dir, llm }) =>
+ Effect.gen(function* () {
+ const prompt = yield* SessionPrompt.Service
+ const sessions = yield* Session.Service
+ const chat = yield* sessions.create({
+ title: "Interrupted bash truncation",
+ permission: [{ permission: "*", pattern: "*", action: "allow" }],
+ })
+
+ yield* prompt.prompt({
+ sessionID: chat.id,
+ agent: "build",
+ noReply: true,
+ parts: [{ type: "text", text: "run bash" }],
+ })
+
+ yield* llm.tool("bash", {
+ command:
+ 'i=0; while [ "$i" -lt 4000 ]; do printf "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx %05d\\n" "$i"; i=$((i + 1)); done; sleep 30',
+ description: "Print many lines",
+ timeout: 30_000,
+ workdir: path.resolve(dir),
+ })
+
+ const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
+ yield* llm.wait(1)
+ yield* Effect.sleep(150)
+ yield* prompt.cancel(chat.id)
+
+ const exit = yield* Fiber.await(run)
+ expect(Exit.isSuccess(exit)).toBe(true)
+ if (Exit.isFailure(exit)) return
+
+ const tool = completedTool(exit.value.parts)
+ if (!tool) return
+
+ expect(tool.state.metadata.truncated).toBe(true)
+ expect(typeof tool.state.metadata.outputPath).toBe("string")
+ expect(tool.state.output).toContain("The tool call succeeded but the output was truncated.")
+ expect(tool.state.output).toContain("Full output saved to:")
+ expect(tool.state.output).not.toContain("Tool execution aborted")
+ }),
+ { git: true, config: providerCfg },
+ ),
+ 30_000,
+)
+
+unix(
"cancel interrupts loop queued behind shell",
() =>
provideTmpdirInstance(