diff options
| author | Adam Malczewski <[email protected]> | 2026-06-24 14:10:03 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-24 14:10:03 +0900 |
| commit | dabcbc79831052effc6ce990021feee07d661f7e (patch) | |
| tree | 3e74e16f36d6a675abe676f0d04ca169f65f0a71 /packages | |
| parent | b58fb8373a1f7311cead23aa9a4d1fcd6927634f (diff) | |
| download | dispatch-dabcbc79831052effc6ce990021feee07d661f7e.tar.gz dispatch-dabcbc79831052effc6ce990021feee07d661f7e.zip | |
fix(kernel+tool-shell): abort hanging tool calls without bricking the conversation
kernel: executeToolCall now races tool.execute against the abort signal
via Promise.race; on abort resolves (not rejects) with an "Aborted" result
so the step completes normally → finishReason "aborted" → turn seals
cleanly (done event) → finally clears activeTurns → conversation freed,
next message accepted. run-turn strips tool-call chunks from the assistant
message on abort (keeps text/thinking) and omits tool-result messages to
avoid persisting dangling tool calls that would 400 the provider next turn.
tool-shell: realSpawn spawns detached (own process group); on abort AND
timeout kills the entire group (process.kill(-pgid, SIGKILL)) and resolves
immediately — no child.on("close") dependency, so a grandchild holding the
pipes can't stall the spawn promise or leak.
Also: ORCHESTRATOR.md migrated to dispatch CLI summon mechanism; .skills
summary; bin/sync-env PATH injection; frontend handoff docs.
1453 vitest pass · tsc -b EXIT 0 · biome clean.
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/kernel/src/runtime/dispatch.test.ts | 535 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/dispatch.ts | 18 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 53 | ||||
| -rw-r--r-- | packages/tool-shell/src/shell.test.ts | 156 | ||||
| -rw-r--r-- | packages/tool-shell/src/shell.ts | 5 | ||||
| -rw-r--r-- | packages/tool-shell/src/spawn.ts | 71 |
6 files changed, 800 insertions, 38 deletions
diff --git a/packages/kernel/src/runtime/dispatch.test.ts b/packages/kernel/src/runtime/dispatch.test.ts new file mode 100644 index 0000000..afbfb39 --- /dev/null +++ b/packages/kernel/src/runtime/dispatch.test.ts @@ -0,0 +1,535 @@ +import { describe, expect, it } from "vitest"; +import type { ChatMessage } from "../contracts/conversation.js"; +import type { AgentEvent } from "../contracts/events.js"; +import type { ProviderContract, ProviderEvent } from "../contracts/provider.js"; +import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js"; +import { executeToolCall } from "./dispatch.js"; +import { runTurn } from "./run-turn.js"; + +// --------------------------------------------------------------------------- +// Helpers (no internal mocks — kernel standard; fakes only) +// --------------------------------------------------------------------------- + +function delay(ms: number): Promise<void> { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +function createFakeProvider(script: ProviderEvent[][]): ProviderContract { + let callIndex = 0; + return { + id: "fake", + stream() { + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) { + yield event; + } + })(); + }, + }; +} + +function createFakeTool( + name: string, + handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>, + opts?: { concurrencySafe?: boolean }, +): ToolContract { + return { + name, + description: `Fake tool: ${name}`, + parameters: { type: "object" }, + ...(opts?.concurrencySafe !== undefined ? { concurrencySafe: opts.concurrencySafe } : {}), + execute: handler ?? (async (input) => ({ content: `${name}: ${JSON.stringify(input)}` })), + }; +} + +function createCollectingEmit(): { events: AgentEvent[]; emit: (event: AgentEvent) => void } { + const events: AgentEvent[] = []; + return { events, emit: (event) => events.push(event) }; +} + +const noopEmit = () => {}; + +const userMessage: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "hello" }], +}; + +const ABORTED_RESULT: ToolResult = { content: "Aborted", isError: true }; + +// =========================================================================== +// executeToolCall — direct unit tests for the abort-signal race +// =========================================================================== + +describe("executeToolCall", () => { + it("returns the tool's result when the tool resolves before abort", async () => { + const ac = new AbortController(); + const tool = createFakeTool("echo", async (input) => ({ + content: `echo: ${JSON.stringify(input)}`, + })); + + const result = await executeToolCall( + { id: "tc1", name: "echo", input: { x: 1 } }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result).toEqual({ content: 'echo: {"x":1}' }); + }); + + it("returns Aborted immediately when signal is already aborted at call time", async () => { + const ac = new AbortController(); + ac.abort(); + const tool = createFakeTool("echo", async () => ({ content: "should not run" })); + + const result = await executeToolCall( + { id: "tc1", name: "echo", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result).toEqual(ABORTED_RESULT); + }); + + it("returns Aborted when a hanging tool is raced against an abort signal", async () => { + const ac = new AbortController(); + // A tool that never resolves and ignores ctx.signal + const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); + + const promise = executeToolCall( + { id: "tc1", name: "hang", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + // Abort after the tool has started + await delay(10); + ac.abort(); + + const result = await promise; + expect(result).toEqual(ABORTED_RESULT); + }); + + it("returns the tool's own result when a signal-aware tool resolves on abort", async () => { + const ac = new AbortController(); + const toolResult: ToolResult = { content: "aborted by tool", isError: true }; + const tool = createFakeTool("aware", (_input, ctx) => { + return new Promise<ToolResult>((resolve) => { + ctx.signal.addEventListener("abort", () => resolve(toolResult), { once: true }); + }); + }); + + const promise = executeToolCall( + { id: "tc1", name: "aware", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + await delay(10); + ac.abort(); + + const result = await promise; + // The tool listens to the signal and resolves its own result. Whether + // the tool's result or the race's "Aborted" wins is timing-dependent; + // both are isError and let the turn seal with finishReason "aborted". + expect(result.isError).toBe(true); + expect(result.content).toBe("aborted by tool"); + }); + + it("swallows a late rejection from the orphaned tool promise after abort wins the race", async () => { + const ac = new AbortController(); + let rejectTool: ((err: Error) => void) | undefined; + const tool = createFakeTool("late-reject", () => { + return new Promise<ToolResult>((_resolve, reject) => { + rejectTool = reject; + }); + }); + + const promise = executeToolCall( + { id: "tc1", name: "late-reject", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + await delay(10); + ac.abort(); + + const result = await promise; + expect(result).toEqual(ABORTED_RESULT); + + // The tool rejects AFTER the race already resolved with "Aborted". + // The no-op catch must swallow this — no unhandled rejection. + rejectTool?.(new Error("late boom")); + // Give the microtask queue a tick to flush + await delay(5); + // If we reach here without an unhandledRejection crashing the process, + // the test passes. (vitest surfaces unhandled rejections as failures.) + }); + + it("returns an error result when the tool rejects before abort", async () => { + const ac = new AbortController(); + const tool = createFakeTool("boom", async () => { + throw new Error("tool exploded"); + }); + + const result = await executeToolCall( + { id: "tc1", name: "boom", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result.isError).toBe(true); + expect(result.content).toContain("tool exploded"); + }); + + it("returns Unknown tool when the tool is undefined", async () => { + const ac = new AbortController(); + const result = await executeToolCall( + { id: "tc1", name: "nonexistent", input: {} }, + undefined, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result.isError).toBe(true); + expect(result.content).toContain("Unknown tool"); + }); +}); + +// =========================================================================== +// runTurn — integration tests for the abort-signal race (durability) +// =========================================================================== + +describe("runTurn abort-race durability", () => { + // Required test 1: A hanging tool (never resolves, ignores ctx.signal) + // must not keep runTurn from returning when the signal aborts. + it("hanging tool + abort → runTurn returns with finishReason aborted and emits done", async () => { + const ac = new AbortController(); + + // A tool whose execute returns a promise that NEVER resolves and + // ignores ctx.signal entirely. + const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); + + // Use eager: true so the tool starts BEFORE the signal aborts. + // This exercises the race (not the early signal.aborted return). + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "hang", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + // runTurn returned (didn't hang) → the race worked. + expect(result.finishReason).toBe("aborted"); + + // A done event was emitted with reason "aborted". + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("aborted"); + } + }); + + // Required test 2: A signal-aware tool that resolves its own result on + // abort must also let runTurn return with finishReason "aborted". + it("signal-aware tool + abort → runTurn returns with finishReason aborted", async () => { + const ac = new AbortController(); + + const tool = createFakeTool("aware", (_input, ctx) => { + return new Promise<ToolResult>((resolve) => { + ctx.signal.addEventListener( + "abort", + () => resolve({ content: "aborted by tool", isError: true }), + { once: true }, + ); + }); + }); + + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "aware", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("aborted"); + } + + // When the step is aborted, tool-result MESSAGES are omitted from the + // result (the tool-result EVENT is still emitted by executeStep for + // live UI updates, but the message is not persisted). This prevents + // orphaned `tool` messages from breaking the next turn's provider + // request. The assistant message has its tool-call chunks stripped. + const toolResultMsg = result.messages.find((m) => m.role === "tool"); + expect(toolResultMsg).toBeUndefined(); + + // The assistant message should NOT contain tool-call chunks. + const assistantMsg = result.messages.find( + (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"), + ); + expect(assistantMsg).toBeUndefined(); + }); + + // Required test 3 (regression guard): Without abort, a normal tool runs + // and its result is used; finishReason reflects the model. + it("no abort → tool runs normally and its result is used (regression)", async () => { + const tool = createFakeTool("normal", async (input) => ({ + content: `result: ${JSON.stringify(input)}`, + })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "normal", input: { x: 1 } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + // finishReason reflects the model (second step's "stop"). + expect(result.finishReason).toBe("stop"); + + // The tool's result was used (fed back, not "Aborted"). + const toolResultMsg = result.messages.find((m) => m.role === "tool"); + expect(toolResultMsg).toBeDefined(); + const trChunk = toolResultMsg?.chunks[0]; + expect(trChunk?.type).toBe("tool-result"); + if (trChunk?.type === "tool-result") { + expect(trChunk.content).toBe('result: {"x":1}'); + expect(trChunk.isError).toBe(false); + } + + // done event emitted with reason "stop". + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("stop"); + } + }); + + // Bonus: multiple hanging tools + abort → all resolve via the race, + // drain() doesn't deadlock, and runTurn returns. Tool-result messages + // are omitted from the result (aborted step); the turn seals cleanly. + it("multiple hanging tools + abort → drain completes and runTurn returns", async () => { + const ac = new AbortController(); + + // Two tools that never resolve and ignore ctx.signal. + const toolA = createFakeTool("hangA", () => new Promise<ToolResult>(() => {})); + const toolB = createFakeTool("hangB", () => new Promise<ToolResult>(() => {})); + + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "hangA", + input: {}, + } as ProviderEvent; + yield { + type: "tool-call", + toolCallId: "tc2", + toolName: "hangB", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [toolA, toolB], + dispatch: { maxConcurrent: 2, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + // tool-result EVENTS are still emitted by executeStep (for live UI), + // but tool-result MESSAGES are omitted from the result (not persisted). + const toolResultEvents = events.filter((e) => e.type === "tool-result"); + expect(toolResultEvents).toHaveLength(2); + for (const tr of toolResultEvents) { + if (tr.type === "tool-result") { + expect(tr.isError).toBe(true); + } + } + + // No tool messages in the result (they would orphan on the next turn). + const toolMessages = result.messages.filter((m) => m.role === "tool"); + expect(toolMessages).toHaveLength(0); + + // Assistant message has no tool-call chunks. + const assistantMsgs = result.messages.filter((m) => m.role === "assistant"); + for (const msg of assistantMsgs) { + expect(msg.chunks.some((c) => c.type === "tool-call")).toBe(false); + } + + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("aborted"); + } + }); + + // Critical regression: after an aborted tool call, the result messages + // must NOT contain orphaned tool messages. If they did, the next turn + // would send a `tool` role message to the provider without a preceding + // `assistant` message carrying `tool_calls` → 400 error. + it("aborted step produces no tool messages and no tool-call chunks in result", async () => { + const ac = new AbortController(); + + // Tool that hangs forever + const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); + + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "Let me run that for you" } as ProviderEvent; + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "hang", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit: noopEmit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + // No tool messages in the result + const toolMessages = result.messages.filter((m) => m.role === "tool"); + expect(toolMessages).toHaveLength(0); + + // The assistant message should preserve text but NOT tool-call chunks + const assistantMsg = result.messages.find((m) => m.role === "assistant"); + expect(assistantMsg).toBeDefined(); + if (assistantMsg !== undefined) { + const hasToolCall = assistantMsg.chunks.some((c) => c.type === "tool-call"); + expect(hasToolCall).toBe(false); + // Text content should be preserved + const hasText = assistantMsg.chunks.some((c) => c.type === "text"); + expect(hasText).toBe(true); + } + + // Simulate what the next turn would see: the result messages are the + // conversation history (minus the user message). If we feed these to + // a simple converter, there should be NO `tool` role messages. + const toolRoleCount = result.messages.filter((m) => m.role === "tool").length; + expect(toolRoleCount).toBe(0); + }); +}); diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts index d1c46cb..e0be1b4 100644 --- a/packages/kernel/src/runtime/dispatch.ts +++ b/packages/kernel/src/runtime/dispatch.ts @@ -35,8 +35,24 @@ export async function executeToolCall( conversationId, ...(cwd !== undefined ? { cwd } : {}), }; + // Race the tool's execute promise against the abort signal so a tool + // that hangs (ignores ctx.signal, or blocks on something the signal + // can't interrupt) can't keep runTurn from returning. When the signal + // fires we RESOLVE (not reject) with an "Aborted" result so the step + // completes normally and the existing signal.aborted → finishReason = + // "aborted" path seals the turn cleanly (done event), letting the + // caller's finally clear active state and the FE clear its spinner. try { - return await tool.execute(call.input, ctx); + const toolPromise = tool.execute(call.input, ctx); + const abortPromise = new Promise<ToolResult>((resolve) => { + signal.addEventListener("abort", () => resolve({ content: "Aborted", isError: true }), { + once: true, + }); + }); + // Swallow late rejections from the orphaned tool promise: the tool + // may reject after the race already resolved with "Aborted". + void toolPromise.catch(() => {}); + return await Promise.race([toolPromise, abortPromise]); } catch (err) { const message = err instanceof Error ? err.message : String(err); return { content: `Tool execution error: ${message}`, isError: true }; diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 4a12e28..f5d80d3 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -89,6 +89,19 @@ function appendThinkingDelta(chunks: Chunk[], delta: string): void { } } +/** + * Remove tool-call chunks from an assistant message, returning a new message + * with only the non-tool-call chunks (text, thinking, error). Returns + * `undefined` when all chunks were tool-calls (so the caller can omit the + * message entirely). Used when a step is aborted to avoid persisting + * incomplete tool calls whose placeholder "Aborted" results would create + * orphaned `tool` messages in the next turn's history. + */ +function stripToolCallChunks(msg: ChatMessage): ChatMessage | undefined { + const stripped = msg.chunks.filter((c) => c.type !== "tool-call"); + return stripped.length > 0 ? { role: msg.role, chunks: stripped } : undefined; +} + interface StepContext { readonly provider: ProviderContract; readonly messages: ChatMessage[]; @@ -516,12 +529,36 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { totalUsage = addUsage(totalUsage, stepResult.usage); lastStepUsage = stepResult.usage; - if (stepResult.assistantMessage !== undefined) { - messages.push(stepResult.assistantMessage); - resultMessages.push(stepResult.assistantMessage); + // When the signal is aborted mid-step, the tool results are + // placeholders ({ content: "Aborted", isError: true }). If these + // are persisted and included in the next turn's message history, + // the provider sees a `tool` role message without a preceding + // `assistant` message carrying `tool_calls` → 400 error. + // + // To prevent this, when the signal is aborted we: + // 1. Strip tool-call chunks from the assistant message (keep + // text/thinking/error chunks so the partial response is + // preserved). + // 2. Omit tool-result messages entirely (they are not persisted, + // not added to resultMessages, and not passed to onStepComplete). + // + // This keeps the conversation history clean: the assistant's + // partial text is preserved, but no incomplete tool calls are + // left dangling. The `done` event still carries + // `reason: "aborted"`, so the turn seals cleanly. + const stepAborted = signal.aborted; + const assistantMessage = + stepAborted && stepResult.assistantMessage !== undefined + ? stripToolCallChunks(stepResult.assistantMessage) + : stepResult.assistantMessage; + const toolMessages = stepAborted ? [] : stepResult.toolMessages; + + if (assistantMessage !== undefined) { + messages.push(assistantMessage); + resultMessages.push(assistantMessage); } - for (const msg of stepResult.toolMessages) { + for (const msg of toolMessages) { messages.push(msg); resultMessages.push(msg); } @@ -532,10 +569,10 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { // SAME objects in resultMessages — the caller must NOT double-persist. if (input.onStepComplete !== undefined) { const stepMessages: ChatMessage[] = []; - if (stepResult.assistantMessage !== undefined) { - stepMessages.push(stepResult.assistantMessage); + if (assistantMessage !== undefined) { + stepMessages.push(assistantMessage); } - for (const msg of stepResult.toolMessages) { + for (const msg of toolMessages) { stepMessages.push(msg); } if (stepMessages.length > 0) { @@ -543,7 +580,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { } } - if (signal.aborted) { + if (stepAborted) { finishReason = "aborted"; break; } diff --git a/packages/tool-shell/src/shell.test.ts b/packages/tool-shell/src/shell.test.ts index a70693b..07e0af4 100644 --- a/packages/tool-shell/src/shell.test.ts +++ b/packages/tool-shell/src/shell.test.ts @@ -22,8 +22,12 @@ function stubCtx(overrides?: Partial<ToolExecuteContext>): ToolExecuteContext { }; } -function fakeSpawn(result: { exitCode: number | null; timedOut: boolean }): SpawnShell { - return async () => result; +function fakeSpawn(result: { + exitCode: number | null; + timedOut: boolean; + aborted?: boolean; +}): SpawnShell { + return async () => ({ aborted: false, ...result }); } describe("validateArgs", () => { @@ -201,7 +205,7 @@ describe("createRunShellTool", () => { workdir: "/tmp", spawn: async (_params) => { _params.onOutput("hello\n", "stdout"); - return { exitCode: 0, timedOut: false }; + return { exitCode: 0, timedOut: false, aborted: false }; }, }); const result = await tool.execute({ command: "echo hello" }, stubCtx()); @@ -214,7 +218,7 @@ describe("createRunShellTool", () => { workdir: "/tmp", spawn: async (_params) => { _params.onOutput("error output\n", "stderr"); - return { exitCode: 1, timedOut: false }; + return { exitCode: 1, timedOut: false, aborted: false }; }, }); const result = await tool.execute({ command: "false" }, stubCtx()); @@ -227,7 +231,7 @@ describe("createRunShellTool", () => { workdir: "/tmp", spawn: async (_params) => { _params.onOutput("partial\n", "stdout"); - return { exitCode: null, timedOut: true }; + return { exitCode: null, timedOut: true, aborted: false }; }, }); const result = await tool.execute({ command: "sleep 999" }, stubCtx()); @@ -242,7 +246,7 @@ describe("createRunShellTool", () => { outputCap: cap, spawn: async (_params) => { _params.onOutput("a".repeat(200), "stdout"); - return { exitCode: 0, timedOut: false }; + return { exitCode: 0, timedOut: false, aborted: false }; }, }); const result = await tool.execute({ command: "gen" }, stubCtx()); @@ -258,7 +262,7 @@ describe("createRunShellTool", () => { params.onOutput("line1\n", "stdout"); params.onOutput("err1\n", "stderr"); params.onOutput("line2\n", "stdout"); - return { exitCode: 0, timedOut: false }; + return { exitCode: 0, timedOut: false, aborted: false }; }, }); await tool.execute( @@ -280,7 +284,7 @@ describe("createRunShellTool", () => { workdir: "/baked", spawn: async (params) => { receivedCwd = params.cwd; - return { exitCode: 0, timedOut: false }; + return { exitCode: 0, timedOut: false, aborted: false }; }, }); await tool.execute({ command: "pwd" }, stubCtx({ cwd: "/custom" })); @@ -293,7 +297,7 @@ describe("createRunShellTool", () => { workdir: "/baked", spawn: async (params) => { receivedCwd = params.cwd; - return { exitCode: 0, timedOut: false }; + return { exitCode: 0, timedOut: false, aborted: false }; }, }); await tool.execute({ command: "pwd" }, stubCtx()); @@ -317,7 +321,7 @@ describe("createRunShellTool", () => { controller.abort(); const tool = createRunShellTool({ workdir: "/tmp", - spawn: async () => ({ exitCode: 0, timedOut: false }), + spawn: async () => ({ exitCode: 0, timedOut: false, aborted: false }), }); const result = await tool.execute({ command: "test" }, stubCtx({ signal: controller.signal })); expect(result.isError).toBe(true); @@ -329,7 +333,7 @@ describe("createRunShellTool", () => { workdir: "/tmp", spawn: async (params) => { receivedTimeout = params.timeout; - return { exitCode: 0, timedOut: false }; + return { exitCode: 0, timedOut: false, aborted: false }; }, }); await tool.execute({ command: "test", timeout: 5000 }, stubCtx()); @@ -355,3 +359,133 @@ describe("createRunShellTool (integration)", () => { expect(streamed).toContain("hello-from-shell"); }); }); + +describe("realSpawn — process-group kill on abort/timeout", () => { + it("aborts a command with a grandchild holding the pipes and resolves immediately", async () => { + const { realSpawn } = await import("./spawn.js"); + const controller = new AbortController(); + + // "sleep 30 & wait" spawns a grandchild (sleep) that inherits the stdio + // pipes. Killing just the sh parent does NOT close the pipes → close never + // fires. With detached:true + process-group kill, the grandchild dies too. + const promise = realSpawn({ + command: "sleep 30 & wait", + cwd: "/tmp", + signal: controller.signal, + timeout: 60_000, + onOutput: () => {}, + }); + + // Give the shell time to actually spawn the grandchild. + await new Promise((r) => setTimeout(r, 500)); + + controller.abort(); + + // Must resolve promptly (not wait 30s for the grandchild's sleep). + const result = await promise; + expect(result.aborted).toBe(true); + expect(result.timedOut).toBe(false); + + // Give the OS a moment to reap the killed processes. + await new Promise((r) => setTimeout(r, 200)); + + // The grandchild sleep process should be gone. Check via pgrep. + const { execSync } = await import("node:child_process"); + let sleeping: string[]; + try { + sleeping = execSync("pgrep -f 'sleep 30'", { encoding: "utf-8" }).trim().split("\n"); + } catch { + // pgrep returns non-zero when no processes match → all gone. + sleeping = []; + } + expect(sleeping.length).toBe(0); + }); + + it("times out a command with a grandchild holding the pipes and resolves promptly", async () => { + const { realSpawn } = await import("./spawn.js"); + const controller = new AbortController(); + + const promise = realSpawn({ + command: "sleep 30 & wait", + cwd: "/tmp", + signal: controller.signal, + timeout: 500, + onOutput: () => {}, + }); + + // Must resolve within a short window (not 30s). + const start = Date.now(); + const result = await promise; + const elapsed = Date.now() - start; + + expect(result.timedOut).toBe(true); + expect(result.aborted).toBe(false); + // Should resolve shortly after the 500ms timeout, well under 30s. + expect(elapsed).toBeLessThan(10_000); + + // Grandchild should be dead. + await new Promise((r) => setTimeout(r, 200)); + const { execSync } = await import("node:child_process"); + let sleeping: string[]; + try { + sleeping = execSync("pgrep -f 'sleep 30'", { encoding: "utf-8" }).trim().split("\n"); + } catch { + sleeping = []; + } + expect(sleeping.length).toBe(0); + }); + + it("captures stdout on normal completion (regression guard)", async () => { + const { realSpawn } = await import("./spawn.js"); + const controller = new AbortController(); + let output = ""; + + const result = await realSpawn({ + command: "echo hi", + cwd: "/tmp", + signal: controller.signal, + timeout: 5_000, + onOutput: (data) => { + output += data; + }, + }); + + expect(result.aborted).toBe(false); + expect(result.timedOut).toBe(false); + expect(result.exitCode).toBe(0); + expect(output).toContain("hi"); + }); + + it("aborts a simple single-process command and resolves with aborted: true", async () => { + const { realSpawn } = await import("./spawn.js"); + const controller = new AbortController(); + + const promise = realSpawn({ + command: "sleep 30", + cwd: "/tmp", + signal: controller.signal, + timeout: 60_000, + onOutput: () => {}, + }); + + // Let the sleep actually start. + await new Promise((r) => setTimeout(r, 300)); + + controller.abort(); + + const result = await promise; + expect(result.aborted).toBe(true); + expect(result.timedOut).toBe(false); + + // The sleep process should be gone. + await new Promise((r) => setTimeout(r, 200)); + const { execSync } = await import("node:child_process"); + let sleeping: string[]; + try { + sleeping = execSync("pgrep -f 'sleep 30'", { encoding: "utf-8" }).trim().split("\n"); + } catch { + sleeping = []; + } + expect(sleeping.length).toBe(0); + }); +}); diff --git a/packages/tool-shell/src/shell.ts b/packages/tool-shell/src/shell.ts index d96d73e..cc76bca 100644 --- a/packages/tool-shell/src/shell.ts +++ b/packages/tool-shell/src/shell.ts @@ -12,6 +12,7 @@ export interface ValidatedArgs { export interface SpawnResult { readonly exitCode: number | null; readonly timedOut: boolean; + readonly aborted: boolean; } export type SpawnShell = (params: { @@ -139,7 +140,6 @@ export function createRunShellTool(deps: { }; let spawnResult: SpawnResult; - let aborted = false; try { spawnResult = await deps.spawn({ @@ -154,7 +154,6 @@ export function createRunShellTool(deps: { }); } catch (err: unknown) { if (ctx.signal.aborted) { - aborted = true; return buildResult({ exitCode: null, timedOut: false, @@ -172,7 +171,7 @@ export function createRunShellTool(deps: { return buildResult({ exitCode: spawnResult.exitCode, timedOut: spawnResult.timedOut, - aborted, + aborted: spawnResult.aborted, output, cap, }); diff --git a/packages/tool-shell/src/spawn.ts b/packages/tool-shell/src/spawn.ts index 9025c26..9b1d7e4 100644 --- a/packages/tool-shell/src/spawn.ts +++ b/packages/tool-shell/src/spawn.ts @@ -3,24 +3,66 @@ import type { SpawnResult, SpawnShell } from "./shell.js"; export const realSpawn: SpawnShell = (params): Promise<SpawnResult> => { return new Promise<SpawnResult>((resolve) => { + // detached: true puts the child in its own process group (pgid = child.pid). + // This lets us kill the entire group (child + any grandchildren that inherit + // the pipes) via process.kill(-pgid, "SIGKILL") on abort/timeout, so a + // backgrounded grandchild can't keep the stdio pipes open and stall the + // promise on child.on("close"). const child = nodeSpawn("sh", ["-c", params.command], { cwd: params.cwd, stdio: ["ignore", "pipe", "pipe"], + detached: true, }); + let settled = false; let timedOut = false; - let killed = false; - const timer = setTimeout(() => { - timedOut = true; - child.kill("SIGKILL"); - }, params.timeout); + let timer: ReturnType<typeof setTimeout> | undefined; + + /** Kill the entire child process group (best-effort — group may be gone). */ + const killGroup = () => { + if (child.pid !== undefined) { + try { + process.kill(-child.pid, "SIGKILL"); + } catch { + // Process group may already be gone — ignore. + } + } + }; + + /** Remove the abort listener and clear the timeout timer (no leaks). */ + const cleanup = () => { + if (timer !== undefined) { + clearTimeout(timer); + timer = undefined; + } + params.signal.removeEventListener("abort", onAbort); + }; + + /** Resolve once, then clean up so listeners/timers never leak. */ + const settle = (result: SpawnResult) => { + if (settled) return; + settled = true; + cleanup(); + resolve(result); + }; const onAbort = () => { - killed = true; - child.kill("SIGKILL"); + if (settled) return; + killGroup(); + // Resolve immediately — do NOT wait for child.on("close"), which may + // never fire if a grandchild holds the pipes open. + settle({ exitCode: null, timedOut: false, aborted: true }); }; params.signal.addEventListener("abort", onAbort, { once: true }); + timer = setTimeout(() => { + if (settled) return; + timedOut = true; + killGroup(); + // Resolve immediately — same reasoning as abort. + settle({ exitCode: null, timedOut: true, aborted: false }); + }, params.timeout); + child.stdout.on("data", (chunk: Buffer) => { params.onOutput(chunk.toString(), "stdout"); }); @@ -29,18 +71,17 @@ export const realSpawn: SpawnShell = (params): Promise<SpawnResult> => { params.onOutput(chunk.toString(), "stderr"); }); + // Normal-completion path: wait for "close" so all stdout/stderr is captured. + // If abort/timeout already settled, this is a no-op (settled === true). child.on("close", (code) => { - clearTimeout(timer); - params.signal.removeEventListener("abort", onAbort); - resolve({ exitCode: code, timedOut }); + settle({ exitCode: code, timedOut, aborted: false }); }); + // Spawn error (e.g. bad cwd, sh not found). Kill the group just in case + // and resolve — never leave the promise pending. child.on("error", () => { - clearTimeout(timer); - params.signal.removeEventListener("abort", onAbort); - if (!killed && !timedOut) { - resolve({ exitCode: 1, timedOut: false }); - } + killGroup(); + settle({ exitCode: 1, timedOut: false, aborted: false }); }); }); }; |
