summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-24 14:10:03 +0900
committerAdam Malczewski <[email protected]>2026-06-24 14:10:03 +0900
commitdabcbc79831052effc6ce990021feee07d661f7e (patch)
tree3e74e16f36d6a675abe676f0d04ca169f65f0a71 /packages
parentb58fb8373a1f7311cead23aa9a4d1fcd6927634f (diff)
downloaddispatch-dabcbc79831052effc6ce990021feee07d661f7e.tar.gz
dispatch-dabcbc79831052effc6ce990021feee07d661f7e.zip
fix(kernel+tool-shell): abort hanging tool calls without bricking the conversation
kernel: executeToolCall now races tool.execute against the abort signal via Promise.race; on abort resolves (not rejects) with an "Aborted" result so the step completes normally → finishReason "aborted" → turn seals cleanly (done event) → finally clears activeTurns → conversation freed, next message accepted. run-turn strips tool-call chunks from the assistant message on abort (keeps text/thinking) and omits tool-result messages to avoid persisting dangling tool calls that would 400 the provider next turn. tool-shell: realSpawn spawns detached (own process group); on abort AND timeout kills the entire group (process.kill(-pgid, SIGKILL)) and resolves immediately — no child.on("close") dependency, so a grandchild holding the pipes can't stall the spawn promise or leak. Also: ORCHESTRATOR.md migrated to dispatch CLI summon mechanism; .skills summary; bin/sync-env PATH injection; frontend handoff docs. 1453 vitest pass · tsc -b EXIT 0 · biome clean.
Diffstat (limited to 'packages')
-rw-r--r--packages/kernel/src/runtime/dispatch.test.ts535
-rw-r--r--packages/kernel/src/runtime/dispatch.ts18
-rw-r--r--packages/kernel/src/runtime/run-turn.ts53
-rw-r--r--packages/tool-shell/src/shell.test.ts156
-rw-r--r--packages/tool-shell/src/shell.ts5
-rw-r--r--packages/tool-shell/src/spawn.ts71
6 files changed, 800 insertions, 38 deletions
diff --git a/packages/kernel/src/runtime/dispatch.test.ts b/packages/kernel/src/runtime/dispatch.test.ts
new file mode 100644
index 0000000..afbfb39
--- /dev/null
+++ b/packages/kernel/src/runtime/dispatch.test.ts
@@ -0,0 +1,535 @@
+import { describe, expect, it } from "vitest";
+import type { ChatMessage } from "../contracts/conversation.js";
+import type { AgentEvent } from "../contracts/events.js";
+import type { ProviderContract, ProviderEvent } from "../contracts/provider.js";
+import type { ToolContract, ToolExecuteContext, ToolResult } from "../contracts/tool.js";
+import { executeToolCall } from "./dispatch.js";
+import { runTurn } from "./run-turn.js";
+
+// ---------------------------------------------------------------------------
+// Helpers (no internal mocks — kernel standard; fakes only)
+// ---------------------------------------------------------------------------
+
+function delay(ms: number): Promise<void> {
+ return new Promise((resolve) => {
+ setTimeout(resolve, ms);
+ });
+}
+
+function createFakeProvider(script: ProviderEvent[][]): ProviderContract {
+ let callIndex = 0;
+ return {
+ id: "fake",
+ stream() {
+ const events = script[callIndex] ?? [];
+ callIndex++;
+ return (async function* () {
+ for (const event of events) {
+ yield event;
+ }
+ })();
+ },
+ };
+}
+
+function createFakeTool(
+ name: string,
+ handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>,
+ opts?: { concurrencySafe?: boolean },
+): ToolContract {
+ return {
+ name,
+ description: `Fake tool: ${name}`,
+ parameters: { type: "object" },
+ ...(opts?.concurrencySafe !== undefined ? { concurrencySafe: opts.concurrencySafe } : {}),
+ execute: handler ?? (async (input) => ({ content: `${name}: ${JSON.stringify(input)}` })),
+ };
+}
+
+function createCollectingEmit(): { events: AgentEvent[]; emit: (event: AgentEvent) => void } {
+ const events: AgentEvent[] = [];
+ return { events, emit: (event) => events.push(event) };
+}
+
+const noopEmit = () => {};
+
+const userMessage: ChatMessage = {
+ role: "user",
+ chunks: [{ type: "text", text: "hello" }],
+};
+
+const ABORTED_RESULT: ToolResult = { content: "Aborted", isError: true };
+
+// ===========================================================================
+// executeToolCall — direct unit tests for the abort-signal race
+// ===========================================================================
+
+describe("executeToolCall", () => {
+ it("returns the tool's result when the tool resolves before abort", async () => {
+ const ac = new AbortController();
+ const tool = createFakeTool("echo", async (input) => ({
+ content: `echo: ${JSON.stringify(input)}`,
+ }));
+
+ const result = await executeToolCall(
+ { id: "tc1", name: "echo", input: { x: 1 } },
+ tool,
+ ac.signal,
+ noopEmit,
+ "conv-1",
+ "turn-1",
+ );
+
+ expect(result).toEqual({ content: 'echo: {"x":1}' });
+ });
+
+ it("returns Aborted immediately when signal is already aborted at call time", async () => {
+ const ac = new AbortController();
+ ac.abort();
+ const tool = createFakeTool("echo", async () => ({ content: "should not run" }));
+
+ const result = await executeToolCall(
+ { id: "tc1", name: "echo", input: {} },
+ tool,
+ ac.signal,
+ noopEmit,
+ "conv-1",
+ "turn-1",
+ );
+
+ expect(result).toEqual(ABORTED_RESULT);
+ });
+
+ it("returns Aborted when a hanging tool is raced against an abort signal", async () => {
+ const ac = new AbortController();
+ // A tool that never resolves and ignores ctx.signal
+ const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {}));
+
+ const promise = executeToolCall(
+ { id: "tc1", name: "hang", input: {} },
+ tool,
+ ac.signal,
+ noopEmit,
+ "conv-1",
+ "turn-1",
+ );
+
+ // Abort after the tool has started
+ await delay(10);
+ ac.abort();
+
+ const result = await promise;
+ expect(result).toEqual(ABORTED_RESULT);
+ });
+
+ it("returns the tool's own result when a signal-aware tool resolves on abort", async () => {
+ const ac = new AbortController();
+ const toolResult: ToolResult = { content: "aborted by tool", isError: true };
+ const tool = createFakeTool("aware", (_input, ctx) => {
+ return new Promise<ToolResult>((resolve) => {
+ ctx.signal.addEventListener("abort", () => resolve(toolResult), { once: true });
+ });
+ });
+
+ const promise = executeToolCall(
+ { id: "tc1", name: "aware", input: {} },
+ tool,
+ ac.signal,
+ noopEmit,
+ "conv-1",
+ "turn-1",
+ );
+
+ await delay(10);
+ ac.abort();
+
+ const result = await promise;
+ // The tool listens to the signal and resolves its own result. Whether
+ // the tool's result or the race's "Aborted" wins is timing-dependent;
+ // both are isError and let the turn seal with finishReason "aborted".
+ expect(result.isError).toBe(true);
+ expect(result.content).toBe("aborted by tool");
+ });
+
+ it("swallows a late rejection from the orphaned tool promise after abort wins the race", async () => {
+ const ac = new AbortController();
+ let rejectTool: ((err: Error) => void) | undefined;
+ const tool = createFakeTool("late-reject", () => {
+ return new Promise<ToolResult>((_resolve, reject) => {
+ rejectTool = reject;
+ });
+ });
+
+ const promise = executeToolCall(
+ { id: "tc1", name: "late-reject", input: {} },
+ tool,
+ ac.signal,
+ noopEmit,
+ "conv-1",
+ "turn-1",
+ );
+
+ await delay(10);
+ ac.abort();
+
+ const result = await promise;
+ expect(result).toEqual(ABORTED_RESULT);
+
+ // The tool rejects AFTER the race already resolved with "Aborted".
+ // The no-op catch must swallow this — no unhandled rejection.
+ rejectTool?.(new Error("late boom"));
+ // Give the microtask queue a tick to flush
+ await delay(5);
+ // If we reach here without an unhandledRejection crashing the process,
+ // the test passes. (vitest surfaces unhandled rejections as failures.)
+ });
+
+ it("returns an error result when the tool rejects before abort", async () => {
+ const ac = new AbortController();
+ const tool = createFakeTool("boom", async () => {
+ throw new Error("tool exploded");
+ });
+
+ const result = await executeToolCall(
+ { id: "tc1", name: "boom", input: {} },
+ tool,
+ ac.signal,
+ noopEmit,
+ "conv-1",
+ "turn-1",
+ );
+
+ expect(result.isError).toBe(true);
+ expect(result.content).toContain("tool exploded");
+ });
+
+ it("returns Unknown tool when the tool is undefined", async () => {
+ const ac = new AbortController();
+ const result = await executeToolCall(
+ { id: "tc1", name: "nonexistent", input: {} },
+ undefined,
+ ac.signal,
+ noopEmit,
+ "conv-1",
+ "turn-1",
+ );
+
+ expect(result.isError).toBe(true);
+ expect(result.content).toContain("Unknown tool");
+ });
+});
+
+// ===========================================================================
+// runTurn — integration tests for the abort-signal race (durability)
+// ===========================================================================
+
+describe("runTurn abort-race durability", () => {
+ // Required test 1: A hanging tool (never resolves, ignores ctx.signal)
+ // must not keep runTurn from returning when the signal aborts.
+ it("hanging tool + abort → runTurn returns with finishReason aborted and emits done", async () => {
+ const ac = new AbortController();
+
+ // A tool whose execute returns a promise that NEVER resolves and
+ // ignores ctx.signal entirely.
+ const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {}));
+
+ // Use eager: true so the tool starts BEFORE the signal aborts.
+ // This exercises the race (not the early signal.aborted return).
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ return (async function* () {
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "hang",
+ input: {},
+ } as ProviderEvent;
+ ac.abort();
+ await delay(10);
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: true },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: ac.signal,
+ });
+
+ // runTurn returned (didn't hang) → the race worked.
+ expect(result.finishReason).toBe("aborted");
+
+ // A done event was emitted with reason "aborted".
+ const doneEvents = events.filter((e) => e.type === "done");
+ expect(doneEvents).toHaveLength(1);
+ if (doneEvents[0]?.type === "done") {
+ expect(doneEvents[0].reason).toBe("aborted");
+ }
+ });
+
+ // Required test 2: A signal-aware tool that resolves its own result on
+ // abort must also let runTurn return with finishReason "aborted".
+ it("signal-aware tool + abort → runTurn returns with finishReason aborted", async () => {
+ const ac = new AbortController();
+
+ const tool = createFakeTool("aware", (_input, ctx) => {
+ return new Promise<ToolResult>((resolve) => {
+ ctx.signal.addEventListener(
+ "abort",
+ () => resolve({ content: "aborted by tool", isError: true }),
+ { once: true },
+ );
+ });
+ });
+
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ return (async function* () {
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "aware",
+ input: {},
+ } as ProviderEvent;
+ ac.abort();
+ await delay(10);
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: true },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: ac.signal,
+ });
+
+ expect(result.finishReason).toBe("aborted");
+
+ const doneEvents = events.filter((e) => e.type === "done");
+ expect(doneEvents).toHaveLength(1);
+ if (doneEvents[0]?.type === "done") {
+ expect(doneEvents[0].reason).toBe("aborted");
+ }
+
+ // When the step is aborted, tool-result MESSAGES are omitted from the
+ // result (the tool-result EVENT is still emitted by executeStep for
+ // live UI updates, but the message is not persisted). This prevents
+ // orphaned `tool` messages from breaking the next turn's provider
+ // request. The assistant message has its tool-call chunks stripped.
+ const toolResultMsg = result.messages.find((m) => m.role === "tool");
+ expect(toolResultMsg).toBeUndefined();
+
+ // The assistant message should NOT contain tool-call chunks.
+ const assistantMsg = result.messages.find(
+ (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"),
+ );
+ expect(assistantMsg).toBeUndefined();
+ });
+
+ // Required test 3 (regression guard): Without abort, a normal tool runs
+ // and its result is used; finishReason reflects the model.
+ it("no abort → tool runs normally and its result is used (regression)", async () => {
+ const tool = createFakeTool("normal", async (input) => ({
+ content: `result: ${JSON.stringify(input)}`,
+ }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "normal", input: { x: 1 } },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: true },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ // finishReason reflects the model (second step's "stop").
+ expect(result.finishReason).toBe("stop");
+
+ // The tool's result was used (fed back, not "Aborted").
+ const toolResultMsg = result.messages.find((m) => m.role === "tool");
+ expect(toolResultMsg).toBeDefined();
+ const trChunk = toolResultMsg?.chunks[0];
+ expect(trChunk?.type).toBe("tool-result");
+ if (trChunk?.type === "tool-result") {
+ expect(trChunk.content).toBe('result: {"x":1}');
+ expect(trChunk.isError).toBe(false);
+ }
+
+ // done event emitted with reason "stop".
+ const doneEvents = events.filter((e) => e.type === "done");
+ expect(doneEvents).toHaveLength(1);
+ if (doneEvents[0]?.type === "done") {
+ expect(doneEvents[0].reason).toBe("stop");
+ }
+ });
+
+ // Bonus: multiple hanging tools + abort → all resolve via the race,
+ // drain() doesn't deadlock, and runTurn returns. Tool-result messages
+ // are omitted from the result (aborted step); the turn seals cleanly.
+ it("multiple hanging tools + abort → drain completes and runTurn returns", async () => {
+ const ac = new AbortController();
+
+ // Two tools that never resolve and ignore ctx.signal.
+ const toolA = createFakeTool("hangA", () => new Promise<ToolResult>(() => {}));
+ const toolB = createFakeTool("hangB", () => new Promise<ToolResult>(() => {}));
+
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ return (async function* () {
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "hangA",
+ input: {},
+ } as ProviderEvent;
+ yield {
+ type: "tool-call",
+ toolCallId: "tc2",
+ toolName: "hangB",
+ input: {},
+ } as ProviderEvent;
+ ac.abort();
+ await delay(10);
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [toolA, toolB],
+ dispatch: { maxConcurrent: 2, eager: true },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: ac.signal,
+ });
+
+ expect(result.finishReason).toBe("aborted");
+
+ // tool-result EVENTS are still emitted by executeStep (for live UI),
+ // but tool-result MESSAGES are omitted from the result (not persisted).
+ const toolResultEvents = events.filter((e) => e.type === "tool-result");
+ expect(toolResultEvents).toHaveLength(2);
+ for (const tr of toolResultEvents) {
+ if (tr.type === "tool-result") {
+ expect(tr.isError).toBe(true);
+ }
+ }
+
+ // No tool messages in the result (they would orphan on the next turn).
+ const toolMessages = result.messages.filter((m) => m.role === "tool");
+ expect(toolMessages).toHaveLength(0);
+
+ // Assistant message has no tool-call chunks.
+ const assistantMsgs = result.messages.filter((m) => m.role === "assistant");
+ for (const msg of assistantMsgs) {
+ expect(msg.chunks.some((c) => c.type === "tool-call")).toBe(false);
+ }
+
+ const doneEvents = events.filter((e) => e.type === "done");
+ expect(doneEvents).toHaveLength(1);
+ if (doneEvents[0]?.type === "done") {
+ expect(doneEvents[0].reason).toBe("aborted");
+ }
+ });
+
+ // Critical regression: after an aborted tool call, the result messages
+ // must NOT contain orphaned tool messages. If they did, the next turn
+ // would send a `tool` role message to the provider without a preceding
+ // `assistant` message carrying `tool_calls` → 400 error.
+ it("aborted step produces no tool messages and no tool-call chunks in result", async () => {
+ const ac = new AbortController();
+
+ // Tool that hangs forever
+ const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {}));
+
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ return (async function* () {
+ yield { type: "text-delta", delta: "Let me run that for you" } as ProviderEvent;
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "hang",
+ input: {},
+ } as ProviderEvent;
+ ac.abort();
+ await delay(10);
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: true },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: noopEmit,
+ signal: ac.signal,
+ });
+
+ expect(result.finishReason).toBe("aborted");
+
+ // No tool messages in the result
+ const toolMessages = result.messages.filter((m) => m.role === "tool");
+ expect(toolMessages).toHaveLength(0);
+
+ // The assistant message should preserve text but NOT tool-call chunks
+ const assistantMsg = result.messages.find((m) => m.role === "assistant");
+ expect(assistantMsg).toBeDefined();
+ if (assistantMsg !== undefined) {
+ const hasToolCall = assistantMsg.chunks.some((c) => c.type === "tool-call");
+ expect(hasToolCall).toBe(false);
+ // Text content should be preserved
+ const hasText = assistantMsg.chunks.some((c) => c.type === "text");
+ expect(hasText).toBe(true);
+ }
+
+ // Simulate what the next turn would see: the result messages are the
+ // conversation history (minus the user message). If we feed these to
+ // a simple converter, there should be NO `tool` role messages.
+ const toolRoleCount = result.messages.filter((m) => m.role === "tool").length;
+ expect(toolRoleCount).toBe(0);
+ });
+});
diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts
index d1c46cb..e0be1b4 100644
--- a/packages/kernel/src/runtime/dispatch.ts
+++ b/packages/kernel/src/runtime/dispatch.ts
@@ -35,8 +35,24 @@ export async function executeToolCall(
conversationId,
...(cwd !== undefined ? { cwd } : {}),
};
+ // Race the tool's execute promise against the abort signal so a tool
+ // that hangs (ignores ctx.signal, or blocks on something the signal
+ // can't interrupt) can't keep runTurn from returning. When the signal
+ // fires we RESOLVE (not reject) with an "Aborted" result so the step
+ // completes normally and the existing signal.aborted → finishReason =
+ // "aborted" path seals the turn cleanly (done event), letting the
+ // caller's finally clear active state and the FE clear its spinner.
try {
- return await tool.execute(call.input, ctx);
+ const toolPromise = tool.execute(call.input, ctx);
+ const abortPromise = new Promise<ToolResult>((resolve) => {
+ signal.addEventListener("abort", () => resolve({ content: "Aborted", isError: true }), {
+ once: true,
+ });
+ });
+ // Swallow late rejections from the orphaned tool promise: the tool
+ // may reject after the race already resolved with "Aborted".
+ void toolPromise.catch(() => {});
+ return await Promise.race([toolPromise, abortPromise]);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { content: `Tool execution error: ${message}`, isError: true };
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 4a12e28..f5d80d3 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -89,6 +89,19 @@ function appendThinkingDelta(chunks: Chunk[], delta: string): void {
}
}
+/**
+ * Remove tool-call chunks from an assistant message, returning a new message
+ * with only the non-tool-call chunks (text, thinking, error). Returns
+ * `undefined` when all chunks were tool-calls (so the caller can omit the
+ * message entirely). Used when a step is aborted to avoid persisting
+ * incomplete tool calls whose placeholder "Aborted" results would create
+ * orphaned `tool` messages in the next turn's history.
+ */
+function stripToolCallChunks(msg: ChatMessage): ChatMessage | undefined {
+ const stripped = msg.chunks.filter((c) => c.type !== "tool-call");
+ return stripped.length > 0 ? { role: msg.role, chunks: stripped } : undefined;
+}
+
interface StepContext {
readonly provider: ProviderContract;
readonly messages: ChatMessage[];
@@ -516,12 +529,36 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
totalUsage = addUsage(totalUsage, stepResult.usage);
lastStepUsage = stepResult.usage;
- if (stepResult.assistantMessage !== undefined) {
- messages.push(stepResult.assistantMessage);
- resultMessages.push(stepResult.assistantMessage);
+ // When the signal is aborted mid-step, the tool results are
+ // placeholders ({ content: "Aborted", isError: true }). If these
+ // are persisted and included in the next turn's message history,
+ // the provider sees a `tool` role message without a preceding
+ // `assistant` message carrying `tool_calls` → 400 error.
+ //
+ // To prevent this, when the signal is aborted we:
+ // 1. Strip tool-call chunks from the assistant message (keep
+ // text/thinking/error chunks so the partial response is
+ // preserved).
+ // 2. Omit tool-result messages entirely (they are not persisted,
+ // not added to resultMessages, and not passed to onStepComplete).
+ //
+ // This keeps the conversation history clean: the assistant's
+ // partial text is preserved, but no incomplete tool calls are
+ // left dangling. The `done` event still carries
+ // `reason: "aborted"`, so the turn seals cleanly.
+ const stepAborted = signal.aborted;
+ const assistantMessage =
+ stepAborted && stepResult.assistantMessage !== undefined
+ ? stripToolCallChunks(stepResult.assistantMessage)
+ : stepResult.assistantMessage;
+ const toolMessages = stepAborted ? [] : stepResult.toolMessages;
+
+ if (assistantMessage !== undefined) {
+ messages.push(assistantMessage);
+ resultMessages.push(assistantMessage);
}
- for (const msg of stepResult.toolMessages) {
+ for (const msg of toolMessages) {
messages.push(msg);
resultMessages.push(msg);
}
@@ -532,10 +569,10 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
// SAME objects in resultMessages — the caller must NOT double-persist.
if (input.onStepComplete !== undefined) {
const stepMessages: ChatMessage[] = [];
- if (stepResult.assistantMessage !== undefined) {
- stepMessages.push(stepResult.assistantMessage);
+ if (assistantMessage !== undefined) {
+ stepMessages.push(assistantMessage);
}
- for (const msg of stepResult.toolMessages) {
+ for (const msg of toolMessages) {
stepMessages.push(msg);
}
if (stepMessages.length > 0) {
@@ -543,7 +580,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
}
}
- if (signal.aborted) {
+ if (stepAborted) {
finishReason = "aborted";
break;
}
diff --git a/packages/tool-shell/src/shell.test.ts b/packages/tool-shell/src/shell.test.ts
index a70693b..07e0af4 100644
--- a/packages/tool-shell/src/shell.test.ts
+++ b/packages/tool-shell/src/shell.test.ts
@@ -22,8 +22,12 @@ function stubCtx(overrides?: Partial<ToolExecuteContext>): ToolExecuteContext {
};
}
-function fakeSpawn(result: { exitCode: number | null; timedOut: boolean }): SpawnShell {
- return async () => result;
+function fakeSpawn(result: {
+ exitCode: number | null;
+ timedOut: boolean;
+ aborted?: boolean;
+}): SpawnShell {
+ return async () => ({ aborted: false, ...result });
}
describe("validateArgs", () => {
@@ -201,7 +205,7 @@ describe("createRunShellTool", () => {
workdir: "/tmp",
spawn: async (_params) => {
_params.onOutput("hello\n", "stdout");
- return { exitCode: 0, timedOut: false };
+ return { exitCode: 0, timedOut: false, aborted: false };
},
});
const result = await tool.execute({ command: "echo hello" }, stubCtx());
@@ -214,7 +218,7 @@ describe("createRunShellTool", () => {
workdir: "/tmp",
spawn: async (_params) => {
_params.onOutput("error output\n", "stderr");
- return { exitCode: 1, timedOut: false };
+ return { exitCode: 1, timedOut: false, aborted: false };
},
});
const result = await tool.execute({ command: "false" }, stubCtx());
@@ -227,7 +231,7 @@ describe("createRunShellTool", () => {
workdir: "/tmp",
spawn: async (_params) => {
_params.onOutput("partial\n", "stdout");
- return { exitCode: null, timedOut: true };
+ return { exitCode: null, timedOut: true, aborted: false };
},
});
const result = await tool.execute({ command: "sleep 999" }, stubCtx());
@@ -242,7 +246,7 @@ describe("createRunShellTool", () => {
outputCap: cap,
spawn: async (_params) => {
_params.onOutput("a".repeat(200), "stdout");
- return { exitCode: 0, timedOut: false };
+ return { exitCode: 0, timedOut: false, aborted: false };
},
});
const result = await tool.execute({ command: "gen" }, stubCtx());
@@ -258,7 +262,7 @@ describe("createRunShellTool", () => {
params.onOutput("line1\n", "stdout");
params.onOutput("err1\n", "stderr");
params.onOutput("line2\n", "stdout");
- return { exitCode: 0, timedOut: false };
+ return { exitCode: 0, timedOut: false, aborted: false };
},
});
await tool.execute(
@@ -280,7 +284,7 @@ describe("createRunShellTool", () => {
workdir: "/baked",
spawn: async (params) => {
receivedCwd = params.cwd;
- return { exitCode: 0, timedOut: false };
+ return { exitCode: 0, timedOut: false, aborted: false };
},
});
await tool.execute({ command: "pwd" }, stubCtx({ cwd: "/custom" }));
@@ -293,7 +297,7 @@ describe("createRunShellTool", () => {
workdir: "/baked",
spawn: async (params) => {
receivedCwd = params.cwd;
- return { exitCode: 0, timedOut: false };
+ return { exitCode: 0, timedOut: false, aborted: false };
},
});
await tool.execute({ command: "pwd" }, stubCtx());
@@ -317,7 +321,7 @@ describe("createRunShellTool", () => {
controller.abort();
const tool = createRunShellTool({
workdir: "/tmp",
- spawn: async () => ({ exitCode: 0, timedOut: false }),
+ spawn: async () => ({ exitCode: 0, timedOut: false, aborted: false }),
});
const result = await tool.execute({ command: "test" }, stubCtx({ signal: controller.signal }));
expect(result.isError).toBe(true);
@@ -329,7 +333,7 @@ describe("createRunShellTool", () => {
workdir: "/tmp",
spawn: async (params) => {
receivedTimeout = params.timeout;
- return { exitCode: 0, timedOut: false };
+ return { exitCode: 0, timedOut: false, aborted: false };
},
});
await tool.execute({ command: "test", timeout: 5000 }, stubCtx());
@@ -355,3 +359,133 @@ describe("createRunShellTool (integration)", () => {
expect(streamed).toContain("hello-from-shell");
});
});
+
+describe("realSpawn — process-group kill on abort/timeout", () => {
+ it("aborts a command with a grandchild holding the pipes and resolves immediately", async () => {
+ const { realSpawn } = await import("./spawn.js");
+ const controller = new AbortController();
+
+ // "sleep 30 & wait" spawns a grandchild (sleep) that inherits the stdio
+ // pipes. Killing just the sh parent does NOT close the pipes → close never
+ // fires. With detached:true + process-group kill, the grandchild dies too.
+ const promise = realSpawn({
+ command: "sleep 30 & wait",
+ cwd: "/tmp",
+ signal: controller.signal,
+ timeout: 60_000,
+ onOutput: () => {},
+ });
+
+ // Give the shell time to actually spawn the grandchild.
+ await new Promise((r) => setTimeout(r, 500));
+
+ controller.abort();
+
+ // Must resolve promptly (not wait 30s for the grandchild's sleep).
+ const result = await promise;
+ expect(result.aborted).toBe(true);
+ expect(result.timedOut).toBe(false);
+
+ // Give the OS a moment to reap the killed processes.
+ await new Promise((r) => setTimeout(r, 200));
+
+ // The grandchild sleep process should be gone. Check via pgrep.
+ const { execSync } = await import("node:child_process");
+ let sleeping: string[];
+ try {
+ sleeping = execSync("pgrep -f 'sleep 30'", { encoding: "utf-8" }).trim().split("\n");
+ } catch {
+ // pgrep returns non-zero when no processes match → all gone.
+ sleeping = [];
+ }
+ expect(sleeping.length).toBe(0);
+ });
+
+ it("times out a command with a grandchild holding the pipes and resolves promptly", async () => {
+ const { realSpawn } = await import("./spawn.js");
+ const controller = new AbortController();
+
+ const promise = realSpawn({
+ command: "sleep 30 & wait",
+ cwd: "/tmp",
+ signal: controller.signal,
+ timeout: 500,
+ onOutput: () => {},
+ });
+
+ // Must resolve within a short window (not 30s).
+ const start = Date.now();
+ const result = await promise;
+ const elapsed = Date.now() - start;
+
+ expect(result.timedOut).toBe(true);
+ expect(result.aborted).toBe(false);
+ // Should resolve shortly after the 500ms timeout, well under 30s.
+ expect(elapsed).toBeLessThan(10_000);
+
+ // Grandchild should be dead.
+ await new Promise((r) => setTimeout(r, 200));
+ const { execSync } = await import("node:child_process");
+ let sleeping: string[];
+ try {
+ sleeping = execSync("pgrep -f 'sleep 30'", { encoding: "utf-8" }).trim().split("\n");
+ } catch {
+ sleeping = [];
+ }
+ expect(sleeping.length).toBe(0);
+ });
+
+ it("captures stdout on normal completion (regression guard)", async () => {
+ const { realSpawn } = await import("./spawn.js");
+ const controller = new AbortController();
+ let output = "";
+
+ const result = await realSpawn({
+ command: "echo hi",
+ cwd: "/tmp",
+ signal: controller.signal,
+ timeout: 5_000,
+ onOutput: (data) => {
+ output += data;
+ },
+ });
+
+ expect(result.aborted).toBe(false);
+ expect(result.timedOut).toBe(false);
+ expect(result.exitCode).toBe(0);
+ expect(output).toContain("hi");
+ });
+
+ it("aborts a simple single-process command and resolves with aborted: true", async () => {
+ const { realSpawn } = await import("./spawn.js");
+ const controller = new AbortController();
+
+ const promise = realSpawn({
+ command: "sleep 30",
+ cwd: "/tmp",
+ signal: controller.signal,
+ timeout: 60_000,
+ onOutput: () => {},
+ });
+
+ // Let the sleep actually start.
+ await new Promise((r) => setTimeout(r, 300));
+
+ controller.abort();
+
+ const result = await promise;
+ expect(result.aborted).toBe(true);
+ expect(result.timedOut).toBe(false);
+
+ // The sleep process should be gone.
+ await new Promise((r) => setTimeout(r, 200));
+ const { execSync } = await import("node:child_process");
+ let sleeping: string[];
+ try {
+ sleeping = execSync("pgrep -f 'sleep 30'", { encoding: "utf-8" }).trim().split("\n");
+ } catch {
+ sleeping = [];
+ }
+ expect(sleeping.length).toBe(0);
+ });
+});
diff --git a/packages/tool-shell/src/shell.ts b/packages/tool-shell/src/shell.ts
index d96d73e..cc76bca 100644
--- a/packages/tool-shell/src/shell.ts
+++ b/packages/tool-shell/src/shell.ts
@@ -12,6 +12,7 @@ export interface ValidatedArgs {
export interface SpawnResult {
readonly exitCode: number | null;
readonly timedOut: boolean;
+ readonly aborted: boolean;
}
export type SpawnShell = (params: {
@@ -139,7 +140,6 @@ export function createRunShellTool(deps: {
};
let spawnResult: SpawnResult;
- let aborted = false;
try {
spawnResult = await deps.spawn({
@@ -154,7 +154,6 @@ export function createRunShellTool(deps: {
});
} catch (err: unknown) {
if (ctx.signal.aborted) {
- aborted = true;
return buildResult({
exitCode: null,
timedOut: false,
@@ -172,7 +171,7 @@ export function createRunShellTool(deps: {
return buildResult({
exitCode: spawnResult.exitCode,
timedOut: spawnResult.timedOut,
- aborted,
+ aborted: spawnResult.aborted,
output,
cap,
});
diff --git a/packages/tool-shell/src/spawn.ts b/packages/tool-shell/src/spawn.ts
index 9025c26..9b1d7e4 100644
--- a/packages/tool-shell/src/spawn.ts
+++ b/packages/tool-shell/src/spawn.ts
@@ -3,24 +3,66 @@ import type { SpawnResult, SpawnShell } from "./shell.js";
export const realSpawn: SpawnShell = (params): Promise<SpawnResult> => {
return new Promise<SpawnResult>((resolve) => {
+ // detached: true puts the child in its own process group (pgid = child.pid).
+ // This lets us kill the entire group (child + any grandchildren that inherit
+ // the pipes) via process.kill(-pgid, "SIGKILL") on abort/timeout, so a
+ // backgrounded grandchild can't keep the stdio pipes open and stall the
+ // promise on child.on("close").
const child = nodeSpawn("sh", ["-c", params.command], {
cwd: params.cwd,
stdio: ["ignore", "pipe", "pipe"],
+ detached: true,
});
+ let settled = false;
let timedOut = false;
- let killed = false;
- const timer = setTimeout(() => {
- timedOut = true;
- child.kill("SIGKILL");
- }, params.timeout);
+ let timer: ReturnType<typeof setTimeout> | undefined;
+
+ /** Kill the entire child process group (best-effort — group may be gone). */
+ const killGroup = () => {
+ if (child.pid !== undefined) {
+ try {
+ process.kill(-child.pid, "SIGKILL");
+ } catch {
+ // Process group may already be gone — ignore.
+ }
+ }
+ };
+
+ /** Remove the abort listener and clear the timeout timer (no leaks). */
+ const cleanup = () => {
+ if (timer !== undefined) {
+ clearTimeout(timer);
+ timer = undefined;
+ }
+ params.signal.removeEventListener("abort", onAbort);
+ };
+
+ /** Resolve once, then clean up so listeners/timers never leak. */
+ const settle = (result: SpawnResult) => {
+ if (settled) return;
+ settled = true;
+ cleanup();
+ resolve(result);
+ };
const onAbort = () => {
- killed = true;
- child.kill("SIGKILL");
+ if (settled) return;
+ killGroup();
+ // Resolve immediately — do NOT wait for child.on("close"), which may
+ // never fire if a grandchild holds the pipes open.
+ settle({ exitCode: null, timedOut: false, aborted: true });
};
params.signal.addEventListener("abort", onAbort, { once: true });
+ timer = setTimeout(() => {
+ if (settled) return;
+ timedOut = true;
+ killGroup();
+ // Resolve immediately — same reasoning as abort.
+ settle({ exitCode: null, timedOut: true, aborted: false });
+ }, params.timeout);
+
child.stdout.on("data", (chunk: Buffer) => {
params.onOutput(chunk.toString(), "stdout");
});
@@ -29,18 +71,17 @@ export const realSpawn: SpawnShell = (params): Promise<SpawnResult> => {
params.onOutput(chunk.toString(), "stderr");
});
+ // Normal-completion path: wait for "close" so all stdout/stderr is captured.
+ // If abort/timeout already settled, this is a no-op (settled === true).
child.on("close", (code) => {
- clearTimeout(timer);
- params.signal.removeEventListener("abort", onAbort);
- resolve({ exitCode: code, timedOut });
+ settle({ exitCode: code, timedOut, aborted: false });
});
+ // Spawn error (e.g. bad cwd, sh not found). Kill the group just in case
+ // and resolve — never leave the promise pending.
child.on("error", () => {
- clearTimeout(timer);
- params.signal.removeEventListener("abort", onAbort);
- if (!killed && !timedOut) {
- resolve({ exitCode: 1, timedOut: false });
- }
+ killGroup();
+ settle({ exitCode: 1, timedOut: false, aborted: false });
});
});
};