diff options
| author | Adam Malczewski <[email protected]> | 2026-06-26 22:03:19 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-26 22:23:39 +0900 |
| commit | 727c98c9dae516a2070eb950410314380a20c974 (patch) | |
| tree | 52aa1022c54f11770be7e4e2a324f0a8b8b8deec /packages/kernel/src/runtime | |
| parent | e59dc11f63b1df51142259bb2c406af8c9c8c2bb (diff) | |
| download | dispatch-727c98c9dae516a2070eb950410314380a20c974.tar.gz dispatch-727c98c9dae516a2070eb950410314380a20c974.zip | |
style: switch from tabs to 2-space indentation
Diffstat (limited to 'packages/kernel/src/runtime')
| -rw-r--r-- | packages/kernel/src/runtime/dispatch.test.ts | 980 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/dispatch.ts | 308 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/events.ts | 270 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/index.ts | 16 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 6810 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 1482 |
6 files changed, 4933 insertions, 4933 deletions
diff --git a/packages/kernel/src/runtime/dispatch.test.ts b/packages/kernel/src/runtime/dispatch.test.ts index afbfb39..dfe2ac7 100644 --- a/packages/kernel/src/runtime/dispatch.test.ts +++ b/packages/kernel/src/runtime/dispatch.test.ts @@ -11,51 +11,51 @@ import { runTurn } from "./run-turn.js"; // --------------------------------------------------------------------------- function delay(ms: number): Promise<void> { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); } function createFakeProvider(script: ProviderEvent[][]): ProviderContract { - let callIndex = 0; - return { - id: "fake", - stream() { - const events = script[callIndex] ?? []; - callIndex++; - return (async function* () { - for (const event of events) { - yield event; - } - })(); - }, - }; + let callIndex = 0; + return { + id: "fake", + stream() { + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) { + yield event; + } + })(); + }, + }; } function createFakeTool( - name: string, - handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>, - opts?: { concurrencySafe?: boolean }, + name: string, + handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>, + opts?: { concurrencySafe?: boolean }, ): ToolContract { - return { - name, - description: `Fake tool: ${name}`, - parameters: { type: "object" }, - ...(opts?.concurrencySafe !== undefined ? { concurrencySafe: opts.concurrencySafe } : {}), - execute: handler ?? (async (input) => ({ content: `${name}: ${JSON.stringify(input)}` })), - }; + return { + name, + description: `Fake tool: ${name}`, + parameters: { type: "object" }, + ...(opts?.concurrencySafe !== undefined ? { concurrencySafe: opts.concurrencySafe } : {}), + execute: handler ?? (async (input) => ({ content: `${name}: ${JSON.stringify(input)}` })), + }; } function createCollectingEmit(): { events: AgentEvent[]; emit: (event: AgentEvent) => void } { - const events: AgentEvent[] = []; - return { events, emit: (event) => events.push(event) }; + const events: AgentEvent[] = []; + return { events, emit: (event) => events.push(event) }; } const noopEmit = () => {}; const userMessage: ChatMessage = { - role: "user", - chunks: [{ type: "text", text: "hello" }], + role: "user", + chunks: [{ type: "text", text: "hello" }], }; const ABORTED_RESULT: ToolResult = { content: "Aborted", isError: true }; @@ -65,158 +65,158 @@ const ABORTED_RESULT: ToolResult = { content: "Aborted", isError: true }; // =========================================================================== describe("executeToolCall", () => { - it("returns the tool's result when the tool resolves before abort", async () => { - const ac = new AbortController(); - const tool = createFakeTool("echo", async (input) => ({ - content: `echo: ${JSON.stringify(input)}`, - })); - - const result = await executeToolCall( - { id: "tc1", name: "echo", input: { x: 1 } }, - tool, - ac.signal, - noopEmit, - "conv-1", - "turn-1", - ); - - expect(result).toEqual({ content: 'echo: {"x":1}' }); - }); - - it("returns Aborted immediately when signal is already aborted at call time", async () => { - const ac = new AbortController(); - ac.abort(); - const tool = createFakeTool("echo", async () => ({ content: "should not run" })); - - const result = await executeToolCall( - { id: "tc1", name: "echo", input: {} }, - tool, - ac.signal, - noopEmit, - "conv-1", - "turn-1", - ); - - expect(result).toEqual(ABORTED_RESULT); - }); - - it("returns Aborted when a hanging tool is raced against an abort signal", async () => { - const ac = new AbortController(); - // A tool that never resolves and ignores ctx.signal - const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); - - const promise = executeToolCall( - { id: "tc1", name: "hang", input: {} }, - tool, - ac.signal, - noopEmit, - "conv-1", - "turn-1", - ); - - // Abort after the tool has started - await delay(10); - ac.abort(); - - const result = await promise; - expect(result).toEqual(ABORTED_RESULT); - }); - - it("returns the tool's own result when a signal-aware tool resolves on abort", async () => { - const ac = new AbortController(); - const toolResult: ToolResult = { content: "aborted by tool", isError: true }; - const tool = createFakeTool("aware", (_input, ctx) => { - return new Promise<ToolResult>((resolve) => { - ctx.signal.addEventListener("abort", () => resolve(toolResult), { once: true }); - }); - }); - - const promise = executeToolCall( - { id: "tc1", name: "aware", input: {} }, - tool, - ac.signal, - noopEmit, - "conv-1", - "turn-1", - ); - - await delay(10); - ac.abort(); - - const result = await promise; - // The tool listens to the signal and resolves its own result. Whether - // the tool's result or the race's "Aborted" wins is timing-dependent; - // both are isError and let the turn seal with finishReason "aborted". - expect(result.isError).toBe(true); - expect(result.content).toBe("aborted by tool"); - }); - - it("swallows a late rejection from the orphaned tool promise after abort wins the race", async () => { - const ac = new AbortController(); - let rejectTool: ((err: Error) => void) | undefined; - const tool = createFakeTool("late-reject", () => { - return new Promise<ToolResult>((_resolve, reject) => { - rejectTool = reject; - }); - }); - - const promise = executeToolCall( - { id: "tc1", name: "late-reject", input: {} }, - tool, - ac.signal, - noopEmit, - "conv-1", - "turn-1", - ); - - await delay(10); - ac.abort(); - - const result = await promise; - expect(result).toEqual(ABORTED_RESULT); - - // The tool rejects AFTER the race already resolved with "Aborted". - // The no-op catch must swallow this — no unhandled rejection. - rejectTool?.(new Error("late boom")); - // Give the microtask queue a tick to flush - await delay(5); - // If we reach here without an unhandledRejection crashing the process, - // the test passes. (vitest surfaces unhandled rejections as failures.) - }); - - it("returns an error result when the tool rejects before abort", async () => { - const ac = new AbortController(); - const tool = createFakeTool("boom", async () => { - throw new Error("tool exploded"); - }); - - const result = await executeToolCall( - { id: "tc1", name: "boom", input: {} }, - tool, - ac.signal, - noopEmit, - "conv-1", - "turn-1", - ); - - expect(result.isError).toBe(true); - expect(result.content).toContain("tool exploded"); - }); - - it("returns Unknown tool when the tool is undefined", async () => { - const ac = new AbortController(); - const result = await executeToolCall( - { id: "tc1", name: "nonexistent", input: {} }, - undefined, - ac.signal, - noopEmit, - "conv-1", - "turn-1", - ); - - expect(result.isError).toBe(true); - expect(result.content).toContain("Unknown tool"); - }); + it("returns the tool's result when the tool resolves before abort", async () => { + const ac = new AbortController(); + const tool = createFakeTool("echo", async (input) => ({ + content: `echo: ${JSON.stringify(input)}`, + })); + + const result = await executeToolCall( + { id: "tc1", name: "echo", input: { x: 1 } }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result).toEqual({ content: 'echo: {"x":1}' }); + }); + + it("returns Aborted immediately when signal is already aborted at call time", async () => { + const ac = new AbortController(); + ac.abort(); + const tool = createFakeTool("echo", async () => ({ content: "should not run" })); + + const result = await executeToolCall( + { id: "tc1", name: "echo", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result).toEqual(ABORTED_RESULT); + }); + + it("returns Aborted when a hanging tool is raced against an abort signal", async () => { + const ac = new AbortController(); + // A tool that never resolves and ignores ctx.signal + const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); + + const promise = executeToolCall( + { id: "tc1", name: "hang", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + // Abort after the tool has started + await delay(10); + ac.abort(); + + const result = await promise; + expect(result).toEqual(ABORTED_RESULT); + }); + + it("returns the tool's own result when a signal-aware tool resolves on abort", async () => { + const ac = new AbortController(); + const toolResult: ToolResult = { content: "aborted by tool", isError: true }; + const tool = createFakeTool("aware", (_input, ctx) => { + return new Promise<ToolResult>((resolve) => { + ctx.signal.addEventListener("abort", () => resolve(toolResult), { once: true }); + }); + }); + + const promise = executeToolCall( + { id: "tc1", name: "aware", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + await delay(10); + ac.abort(); + + const result = await promise; + // The tool listens to the signal and resolves its own result. Whether + // the tool's result or the race's "Aborted" wins is timing-dependent; + // both are isError and let the turn seal with finishReason "aborted". + expect(result.isError).toBe(true); + expect(result.content).toBe("aborted by tool"); + }); + + it("swallows a late rejection from the orphaned tool promise after abort wins the race", async () => { + const ac = new AbortController(); + let rejectTool: ((err: Error) => void) | undefined; + const tool = createFakeTool("late-reject", () => { + return new Promise<ToolResult>((_resolve, reject) => { + rejectTool = reject; + }); + }); + + const promise = executeToolCall( + { id: "tc1", name: "late-reject", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + await delay(10); + ac.abort(); + + const result = await promise; + expect(result).toEqual(ABORTED_RESULT); + + // The tool rejects AFTER the race already resolved with "Aborted". + // The no-op catch must swallow this — no unhandled rejection. + rejectTool?.(new Error("late boom")); + // Give the microtask queue a tick to flush + await delay(5); + // If we reach here without an unhandledRejection crashing the process, + // the test passes. (vitest surfaces unhandled rejections as failures.) + }); + + it("returns an error result when the tool rejects before abort", async () => { + const ac = new AbortController(); + const tool = createFakeTool("boom", async () => { + throw new Error("tool exploded"); + }); + + const result = await executeToolCall( + { id: "tc1", name: "boom", input: {} }, + tool, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result.isError).toBe(true); + expect(result.content).toContain("tool exploded"); + }); + + it("returns Unknown tool when the tool is undefined", async () => { + const ac = new AbortController(); + const result = await executeToolCall( + { id: "tc1", name: "nonexistent", input: {} }, + undefined, + ac.signal, + noopEmit, + "conv-1", + "turn-1", + ); + + expect(result.isError).toBe(true); + expect(result.content).toContain("Unknown tool"); + }); }); // =========================================================================== @@ -224,312 +224,312 @@ describe("executeToolCall", () => { // =========================================================================== describe("runTurn abort-race durability", () => { - // Required test 1: A hanging tool (never resolves, ignores ctx.signal) - // must not keep runTurn from returning when the signal aborts. - it("hanging tool + abort → runTurn returns with finishReason aborted and emits done", async () => { - const ac = new AbortController(); - - // A tool whose execute returns a promise that NEVER resolves and - // ignores ctx.signal entirely. - const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); - - // Use eager: true so the tool starts BEFORE the signal aborts. - // This exercises the race (not the early signal.aborted return). - const provider: ProviderContract = { - id: "fake", - stream() { - return (async function* () { - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "hang", - input: {}, - } as ProviderEvent; - ac.abort(); - await delay(10); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: true }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: ac.signal, - }); - - // runTurn returned (didn't hang) → the race worked. - expect(result.finishReason).toBe("aborted"); - - // A done event was emitted with reason "aborted". - const doneEvents = events.filter((e) => e.type === "done"); - expect(doneEvents).toHaveLength(1); - if (doneEvents[0]?.type === "done") { - expect(doneEvents[0].reason).toBe("aborted"); - } - }); - - // Required test 2: A signal-aware tool that resolves its own result on - // abort must also let runTurn return with finishReason "aborted". - it("signal-aware tool + abort → runTurn returns with finishReason aborted", async () => { - const ac = new AbortController(); - - const tool = createFakeTool("aware", (_input, ctx) => { - return new Promise<ToolResult>((resolve) => { - ctx.signal.addEventListener( - "abort", - () => resolve({ content: "aborted by tool", isError: true }), - { once: true }, - ); - }); - }); - - const provider: ProviderContract = { - id: "fake", - stream() { - return (async function* () { - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "aware", - input: {}, - } as ProviderEvent; - ac.abort(); - await delay(10); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: true }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: ac.signal, - }); - - expect(result.finishReason).toBe("aborted"); - - const doneEvents = events.filter((e) => e.type === "done"); - expect(doneEvents).toHaveLength(1); - if (doneEvents[0]?.type === "done") { - expect(doneEvents[0].reason).toBe("aborted"); - } - - // When the step is aborted, tool-result MESSAGES are omitted from the - // result (the tool-result EVENT is still emitted by executeStep for - // live UI updates, but the message is not persisted). This prevents - // orphaned `tool` messages from breaking the next turn's provider - // request. The assistant message has its tool-call chunks stripped. - const toolResultMsg = result.messages.find((m) => m.role === "tool"); - expect(toolResultMsg).toBeUndefined(); - - // The assistant message should NOT contain tool-call chunks. - const assistantMsg = result.messages.find( - (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"), - ); - expect(assistantMsg).toBeUndefined(); - }); - - // Required test 3 (regression guard): Without abort, a normal tool runs - // and its result is used; finishReason reflects the model. - it("no abort → tool runs normally and its result is used (regression)", async () => { - const tool = createFakeTool("normal", async (input) => ({ - content: `result: ${JSON.stringify(input)}`, - })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "normal", input: { x: 1 } }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: true }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - // finishReason reflects the model (second step's "stop"). - expect(result.finishReason).toBe("stop"); - - // The tool's result was used (fed back, not "Aborted"). - const toolResultMsg = result.messages.find((m) => m.role === "tool"); - expect(toolResultMsg).toBeDefined(); - const trChunk = toolResultMsg?.chunks[0]; - expect(trChunk?.type).toBe("tool-result"); - if (trChunk?.type === "tool-result") { - expect(trChunk.content).toBe('result: {"x":1}'); - expect(trChunk.isError).toBe(false); - } - - // done event emitted with reason "stop". - const doneEvents = events.filter((e) => e.type === "done"); - expect(doneEvents).toHaveLength(1); - if (doneEvents[0]?.type === "done") { - expect(doneEvents[0].reason).toBe("stop"); - } - }); - - // Bonus: multiple hanging tools + abort → all resolve via the race, - // drain() doesn't deadlock, and runTurn returns. Tool-result messages - // are omitted from the result (aborted step); the turn seals cleanly. - it("multiple hanging tools + abort → drain completes and runTurn returns", async () => { - const ac = new AbortController(); - - // Two tools that never resolve and ignore ctx.signal. - const toolA = createFakeTool("hangA", () => new Promise<ToolResult>(() => {})); - const toolB = createFakeTool("hangB", () => new Promise<ToolResult>(() => {})); - - const provider: ProviderContract = { - id: "fake", - stream() { - return (async function* () { - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "hangA", - input: {}, - } as ProviderEvent; - yield { - type: "tool-call", - toolCallId: "tc2", - toolName: "hangB", - input: {}, - } as ProviderEvent; - ac.abort(); - await delay(10); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [toolA, toolB], - dispatch: { maxConcurrent: 2, eager: true }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: ac.signal, - }); - - expect(result.finishReason).toBe("aborted"); - - // tool-result EVENTS are still emitted by executeStep (for live UI), - // but tool-result MESSAGES are omitted from the result (not persisted). - const toolResultEvents = events.filter((e) => e.type === "tool-result"); - expect(toolResultEvents).toHaveLength(2); - for (const tr of toolResultEvents) { - if (tr.type === "tool-result") { - expect(tr.isError).toBe(true); - } - } - - // No tool messages in the result (they would orphan on the next turn). - const toolMessages = result.messages.filter((m) => m.role === "tool"); - expect(toolMessages).toHaveLength(0); - - // Assistant message has no tool-call chunks. - const assistantMsgs = result.messages.filter((m) => m.role === "assistant"); - for (const msg of assistantMsgs) { - expect(msg.chunks.some((c) => c.type === "tool-call")).toBe(false); - } - - const doneEvents = events.filter((e) => e.type === "done"); - expect(doneEvents).toHaveLength(1); - if (doneEvents[0]?.type === "done") { - expect(doneEvents[0].reason).toBe("aborted"); - } - }); - - // Critical regression: after an aborted tool call, the result messages - // must NOT contain orphaned tool messages. If they did, the next turn - // would send a `tool` role message to the provider without a preceding - // `assistant` message carrying `tool_calls` → 400 error. - it("aborted step produces no tool messages and no tool-call chunks in result", async () => { - const ac = new AbortController(); - - // Tool that hangs forever - const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); - - const provider: ProviderContract = { - id: "fake", - stream() { - return (async function* () { - yield { type: "text-delta", delta: "Let me run that for you" } as ProviderEvent; - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "hang", - input: {}, - } as ProviderEvent; - ac.abort(); - await delay(10); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - })(); - }, - }; - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: true }, - conversationId: "conv-1", - turnId: "turn-1", - emit: noopEmit, - signal: ac.signal, - }); - - expect(result.finishReason).toBe("aborted"); - - // No tool messages in the result - const toolMessages = result.messages.filter((m) => m.role === "tool"); - expect(toolMessages).toHaveLength(0); - - // The assistant message should preserve text but NOT tool-call chunks - const assistantMsg = result.messages.find((m) => m.role === "assistant"); - expect(assistantMsg).toBeDefined(); - if (assistantMsg !== undefined) { - const hasToolCall = assistantMsg.chunks.some((c) => c.type === "tool-call"); - expect(hasToolCall).toBe(false); - // Text content should be preserved - const hasText = assistantMsg.chunks.some((c) => c.type === "text"); - expect(hasText).toBe(true); - } - - // Simulate what the next turn would see: the result messages are the - // conversation history (minus the user message). If we feed these to - // a simple converter, there should be NO `tool` role messages. - const toolRoleCount = result.messages.filter((m) => m.role === "tool").length; - expect(toolRoleCount).toBe(0); - }); + // Required test 1: A hanging tool (never resolves, ignores ctx.signal) + // must not keep runTurn from returning when the signal aborts. + it("hanging tool + abort → runTurn returns with finishReason aborted and emits done", async () => { + const ac = new AbortController(); + + // A tool whose execute returns a promise that NEVER resolves and + // ignores ctx.signal entirely. + const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); + + // Use eager: true so the tool starts BEFORE the signal aborts. + // This exercises the race (not the early signal.aborted return). + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "hang", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + // runTurn returned (didn't hang) → the race worked. + expect(result.finishReason).toBe("aborted"); + + // A done event was emitted with reason "aborted". + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("aborted"); + } + }); + + // Required test 2: A signal-aware tool that resolves its own result on + // abort must also let runTurn return with finishReason "aborted". + it("signal-aware tool + abort → runTurn returns with finishReason aborted", async () => { + const ac = new AbortController(); + + const tool = createFakeTool("aware", (_input, ctx) => { + return new Promise<ToolResult>((resolve) => { + ctx.signal.addEventListener( + "abort", + () => resolve({ content: "aborted by tool", isError: true }), + { once: true }, + ); + }); + }); + + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "aware", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("aborted"); + } + + // When the step is aborted, tool-result MESSAGES are omitted from the + // result (the tool-result EVENT is still emitted by executeStep for + // live UI updates, but the message is not persisted). This prevents + // orphaned `tool` messages from breaking the next turn's provider + // request. The assistant message has its tool-call chunks stripped. + const toolResultMsg = result.messages.find((m) => m.role === "tool"); + expect(toolResultMsg).toBeUndefined(); + + // The assistant message should NOT contain tool-call chunks. + const assistantMsg = result.messages.find( + (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"), + ); + expect(assistantMsg).toBeUndefined(); + }); + + // Required test 3 (regression guard): Without abort, a normal tool runs + // and its result is used; finishReason reflects the model. + it("no abort → tool runs normally and its result is used (regression)", async () => { + const tool = createFakeTool("normal", async (input) => ({ + content: `result: ${JSON.stringify(input)}`, + })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "normal", input: { x: 1 } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + // finishReason reflects the model (second step's "stop"). + expect(result.finishReason).toBe("stop"); + + // The tool's result was used (fed back, not "Aborted"). + const toolResultMsg = result.messages.find((m) => m.role === "tool"); + expect(toolResultMsg).toBeDefined(); + const trChunk = toolResultMsg?.chunks[0]; + expect(trChunk?.type).toBe("tool-result"); + if (trChunk?.type === "tool-result") { + expect(trChunk.content).toBe('result: {"x":1}'); + expect(trChunk.isError).toBe(false); + } + + // done event emitted with reason "stop". + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("stop"); + } + }); + + // Bonus: multiple hanging tools + abort → all resolve via the race, + // drain() doesn't deadlock, and runTurn returns. Tool-result messages + // are omitted from the result (aborted step); the turn seals cleanly. + it("multiple hanging tools + abort → drain completes and runTurn returns", async () => { + const ac = new AbortController(); + + // Two tools that never resolve and ignore ctx.signal. + const toolA = createFakeTool("hangA", () => new Promise<ToolResult>(() => {})); + const toolB = createFakeTool("hangB", () => new Promise<ToolResult>(() => {})); + + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "hangA", + input: {}, + } as ProviderEvent; + yield { + type: "tool-call", + toolCallId: "tc2", + toolName: "hangB", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [toolA, toolB], + dispatch: { maxConcurrent: 2, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + // tool-result EVENTS are still emitted by executeStep (for live UI), + // but tool-result MESSAGES are omitted from the result (not persisted). + const toolResultEvents = events.filter((e) => e.type === "tool-result"); + expect(toolResultEvents).toHaveLength(2); + for (const tr of toolResultEvents) { + if (tr.type === "tool-result") { + expect(tr.isError).toBe(true); + } + } + + // No tool messages in the result (they would orphan on the next turn). + const toolMessages = result.messages.filter((m) => m.role === "tool"); + expect(toolMessages).toHaveLength(0); + + // Assistant message has no tool-call chunks. + const assistantMsgs = result.messages.filter((m) => m.role === "assistant"); + for (const msg of assistantMsgs) { + expect(msg.chunks.some((c) => c.type === "tool-call")).toBe(false); + } + + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + if (doneEvents[0]?.type === "done") { + expect(doneEvents[0].reason).toBe("aborted"); + } + }); + + // Critical regression: after an aborted tool call, the result messages + // must NOT contain orphaned tool messages. If they did, the next turn + // would send a `tool` role message to the provider without a preceding + // `assistant` message carrying `tool_calls` → 400 error. + it("aborted step produces no tool messages and no tool-call chunks in result", async () => { + const ac = new AbortController(); + + // Tool that hangs forever + const tool = createFakeTool("hang", () => new Promise<ToolResult>(() => {})); + + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "Let me run that for you" } as ProviderEvent; + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "hang", + input: {}, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "conv-1", + turnId: "turn-1", + emit: noopEmit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + // No tool messages in the result + const toolMessages = result.messages.filter((m) => m.role === "tool"); + expect(toolMessages).toHaveLength(0); + + // The assistant message should preserve text but NOT tool-call chunks + const assistantMsg = result.messages.find((m) => m.role === "assistant"); + expect(assistantMsg).toBeDefined(); + if (assistantMsg !== undefined) { + const hasToolCall = assistantMsg.chunks.some((c) => c.type === "tool-call"); + expect(hasToolCall).toBe(false); + // Text content should be preserved + const hasText = assistantMsg.chunks.some((c) => c.type === "text"); + expect(hasText).toBe(true); + } + + // Simulate what the next turn would see: the result messages are the + // conversation history (minus the user message). If we feed these to + // a simple converter, there should be NO `tool` role messages. + const toolRoleCount = result.messages.filter((m) => m.role === "tool").length; + expect(toolRoleCount).toBe(0); + }); }); diff --git a/packages/kernel/src/runtime/dispatch.ts b/packages/kernel/src/runtime/dispatch.ts index 01f0043..d09db3b 100644 --- a/packages/kernel/src/runtime/dispatch.ts +++ b/packages/kernel/src/runtime/dispatch.ts @@ -5,182 +5,182 @@ import type { ToolCall, ToolContract, ToolExecuteContext, ToolResult } from "../ import { toolOutputEvent } from "./events.js"; export interface StepDispatcher { - submit(call: ToolCall): void; - drain(): Promise<Map<string, ToolResult>>; + submit(call: ToolCall): void; + drain(): Promise<Map<string, ToolResult>>; } export async function executeToolCall( - call: ToolCall, - tool: ToolContract | undefined, - signal: AbortSignal, - emit: EventEmitter, - conversationId: string, - turnId: string, - toolSpan?: Span, - cwd?: string, - computerId?: string, + call: ToolCall, + tool: ToolContract | undefined, + signal: AbortSignal, + emit: EventEmitter, + conversationId: string, + turnId: string, + toolSpan?: Span, + cwd?: string, + computerId?: string, ): Promise<ToolResult> { - if (tool === undefined) { - return { content: `Unknown tool: ${call.name}`, isError: true }; - } - if (signal.aborted) { - return { content: "Aborted", isError: true }; - } - const ctx: ToolExecuteContext = { - toolCallId: call.id, - signal, - onOutput: (data, stream) => { - emit(toolOutputEvent(conversationId, turnId, call.id, data, stream)); - }, - log: toolSpan?.log ?? createNoopLogger(), - conversationId, - ...(cwd !== undefined ? { cwd } : {}), - ...(computerId !== undefined ? { computerId } : {}), - }; - // Race the tool's execute promise against the abort signal so a tool - // that hangs (ignores ctx.signal, or blocks on something the signal - // can't interrupt) can't keep runTurn from returning. When the signal - // fires we RESOLVE (not reject) with an "Aborted" result so the step - // completes normally and the existing signal.aborted → finishReason = - // "aborted" path seals the turn cleanly (done event), letting the - // caller's finally clear active state and the FE clear its spinner. - try { - const toolPromise = tool.execute(call.input, ctx); - const abortPromise = new Promise<ToolResult>((resolve) => { - signal.addEventListener("abort", () => resolve({ content: "Aborted", isError: true }), { - once: true, - }); - }); - // Swallow late rejections from the orphaned tool promise: the tool - // may reject after the race already resolved with "Aborted". - void toolPromise.catch(() => {}); - return await Promise.race([toolPromise, abortPromise]); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - return { content: `Tool execution error: ${message}`, isError: true }; - } + if (tool === undefined) { + return { content: `Unknown tool: ${call.name}`, isError: true }; + } + if (signal.aborted) { + return { content: "Aborted", isError: true }; + } + const ctx: ToolExecuteContext = { + toolCallId: call.id, + signal, + onOutput: (data, stream) => { + emit(toolOutputEvent(conversationId, turnId, call.id, data, stream)); + }, + log: toolSpan?.log ?? createNoopLogger(), + conversationId, + ...(cwd !== undefined ? { cwd } : {}), + ...(computerId !== undefined ? { computerId } : {}), + }; + // Race the tool's execute promise against the abort signal so a tool + // that hangs (ignores ctx.signal, or blocks on something the signal + // can't interrupt) can't keep runTurn from returning. When the signal + // fires we RESOLVE (not reject) with an "Aborted" result so the step + // completes normally and the existing signal.aborted → finishReason = + // "aborted" path seals the turn cleanly (done event), letting the + // caller's finally clear active state and the FE clear its spinner. + try { + const toolPromise = tool.execute(call.input, ctx); + const abortPromise = new Promise<ToolResult>((resolve) => { + signal.addEventListener("abort", () => resolve({ content: "Aborted", isError: true }), { + once: true, + }); + }); + // Swallow late rejections from the orphaned tool promise: the tool + // may reject after the race already resolved with "Aborted". + void toolPromise.catch(() => {}); + return await Promise.race([toolPromise, abortPromise]); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { content: `Tool execution error: ${message}`, isError: true }; + } } interface QueueEntry { - readonly call: ToolCall; - readonly tool: ToolContract | undefined; - readonly resolve: (result: ToolResult) => void; + readonly call: ToolCall; + readonly tool: ToolContract | undefined; + readonly resolve: (result: ToolResult) => void; } export function createStepDispatcher( - toolMap: Map<string, ToolContract>, - policy: ToolDispatchPolicy, - signal: AbortSignal, - emit: EventEmitter, - conversationId: string, - turnId: string, - toolSpans: Map<string, Span>, - cwd?: string, - computerId?: string, + toolMap: Map<string, ToolContract>, + policy: ToolDispatchPolicy, + signal: AbortSignal, + emit: EventEmitter, + conversationId: string, + turnId: string, + toolSpans: Map<string, Span>, + cwd?: string, + computerId?: string, ): StepDispatcher { - let activeCount = 0; - let unsafeRunning = false; - const queue: QueueEntry[] = []; - const allPromises: Array<{ id: string; promise: Promise<ToolResult> }> = []; - const dedupMap = new Map<string, Promise<ToolResult>>(); + let activeCount = 0; + let unsafeRunning = false; + const queue: QueueEntry[] = []; + const allPromises: Array<{ id: string; promise: Promise<ToolResult> }> = []; + const dedupMap = new Map<string, Promise<ToolResult>>(); - function canStart(isConcurrencySafe: boolean): boolean { - if (unsafeRunning) return false; - if (!isConcurrencySafe && activeCount > 0) return false; - if (policy.maxConcurrent === 0) return true; - return activeCount < policy.maxConcurrent; - } + function canStart(isConcurrencySafe: boolean): boolean { + if (unsafeRunning) return false; + if (!isConcurrencySafe && activeCount > 0) return false; + if (policy.maxConcurrent === 0) return true; + return activeCount < policy.maxConcurrent; + } - function tryStartNext(): void { - while (queue.length > 0) { - const next = queue[0]; - if (next === undefined) break; - const isSafe = next.tool?.concurrencySafe !== false; - if (!canStart(isSafe)) break; - queue.shift(); - activeCount++; - if (!isSafe) unsafeRunning = true; - void runAndResolve(next); - } - } + function tryStartNext(): void { + while (queue.length > 0) { + const next = queue[0]; + if (next === undefined) break; + const isSafe = next.tool?.concurrencySafe !== false; + if (!canStart(isSafe)) break; + queue.shift(); + activeCount++; + if (!isSafe) unsafeRunning = true; + void runAndResolve(next); + } + } - async function runAndResolve(entry: QueueEntry): Promise<void> { - const tcSpan = toolSpans.get(entry.call.id); - const result = await executeToolCall( - entry.call, - entry.tool, - signal, - emit, - conversationId, - turnId, - tcSpan, - cwd, - computerId, - ); - activeCount--; - if (entry.tool?.concurrencySafe === false) unsafeRunning = false; - entry.resolve(result); - tryStartNext(); - } + async function runAndResolve(entry: QueueEntry): Promise<void> { + const tcSpan = toolSpans.get(entry.call.id); + const result = await executeToolCall( + entry.call, + entry.tool, + signal, + emit, + conversationId, + turnId, + tcSpan, + cwd, + computerId, + ); + activeCount--; + if (entry.tool?.concurrencySafe === false) unsafeRunning = false; + entry.resolve(result); + tryStartNext(); + } - function submit(call: ToolCall): void { - const tool = toolMap.get(call.name); - const key = `${call.name}:${JSON.stringify(call.input)}`; + function submit(call: ToolCall): void { + const tool = toolMap.get(call.name); + const key = `${call.name}:${JSON.stringify(call.input)}`; - const existing = dedupMap.get(key); - if (existing !== undefined) { - allPromises.push({ id: call.id, promise: existing }); - return; - } + const existing = dedupMap.get(key); + if (existing !== undefined) { + allPromises.push({ id: call.id, promise: existing }); + return; + } - const promise = new Promise<ToolResult>((resolve) => { - queue.push({ call, tool, resolve }); - tryStartNext(); - }); + const promise = new Promise<ToolResult>((resolve) => { + queue.push({ call, tool, resolve }); + tryStartNext(); + }); - dedupMap.set(key, promise); - allPromises.push({ id: call.id, promise }); - } + dedupMap.set(key, promise); + allPromises.push({ id: call.id, promise }); + } - async function drain(): Promise<Map<string, ToolResult>> { - if (signal.aborted) { - for (const item of queue) { - item.resolve({ content: "Aborted", isError: true }); - } - queue.length = 0; - } + async function drain(): Promise<Map<string, ToolResult>> { + if (signal.aborted) { + for (const item of queue) { + item.resolve({ content: "Aborted", isError: true }); + } + queue.length = 0; + } - const results = new Map<string, ToolResult>(); - for (const entry of allPromises) { - const result = await entry.promise; - results.set(entry.id, result); - } - return results; - } + const results = new Map<string, ToolResult>(); + for (const entry of allPromises) { + const result = await entry.promise; + results.set(entry.id, result); + } + return results; + } - return { submit, drain }; + return { submit, drain }; } function createNoopLogger(): Logger { - return { - debug() {}, - info() {}, - warn() {}, - error() {}, - child() { - return createNoopLogger(); - }, - span() { - return { - id: "noop", - log: createNoopLogger(), - setAttributes() {}, - addLink() {}, - child() { - return this; - }, - end() {}, - }; - }, - }; + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; } diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index 5805e28..353b6ca 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -3,178 +3,178 @@ import type { AgentEvent } from "../contracts/events.js"; import type { Usage } from "../contracts/provider.js"; export function textDeltaEvent(conversationId: string, turnId: string, delta: string): AgentEvent { - return { type: "text-delta", conversationId, turnId, delta }; + return { type: "text-delta", conversationId, turnId, delta }; } export function reasoningDeltaEvent( - conversationId: string, - turnId: string, - delta: string, + conversationId: string, + turnId: string, + delta: string, ): AgentEvent { - return { type: "reasoning-delta", conversationId, turnId, delta }; + return { type: "reasoning-delta", conversationId, turnId, delta }; } export function toolCallEvent( - conversationId: string, - turnId: string, - stepId: StepId, - toolCallId: string, - toolName: string, - input: unknown, + conversationId: string, + turnId: string, + stepId: StepId, + toolCallId: string, + toolName: string, + input: unknown, ): AgentEvent { - return { type: "tool-call", conversationId, turnId, stepId, toolCallId, toolName, input }; + return { type: "tool-call", conversationId, turnId, stepId, toolCallId, toolName, input }; } export function toolResultEvent( - conversationId: string, - turnId: string, - stepId: StepId, - toolCallId: string, - toolName: string, - content: string, - isError: boolean, - durationMs?: number, + conversationId: string, + turnId: string, + stepId: StepId, + toolCallId: string, + toolName: string, + content: string, + isError: boolean, + durationMs?: number, ): AgentEvent { - const base = { - type: "tool-result" as const, - conversationId, - turnId, - stepId, - toolCallId, - toolName, - content, - isError, - }; - if (durationMs !== undefined) { - return { ...base, durationMs }; - } - return base; + const base = { + type: "tool-result" as const, + conversationId, + turnId, + stepId, + toolCallId, + toolName, + content, + isError, + }; + if (durationMs !== undefined) { + return { ...base, durationMs }; + } + return base; } export function toolOutputEvent( - conversationId: string, - turnId: string, - toolCallId: string, - data: string, - stream: "stdout" | "stderr", + conversationId: string, + turnId: string, + toolCallId: string, + data: string, + stream: "stdout" | "stderr", ): AgentEvent { - return { type: "tool-output", conversationId, turnId, toolCallId, data, stream }; + return { type: "tool-output", conversationId, turnId, toolCallId, data, stream }; } export function usageEvent( - conversationId: string, - turnId: string, - usage: Usage, - stepId?: StepId, + conversationId: string, + turnId: string, + usage: Usage, + stepId?: StepId, ): AgentEvent { - if (stepId !== undefined) { - return { type: "usage", conversationId, turnId, usage, stepId }; - } - return { type: "usage", conversationId, turnId, usage }; + if (stepId !== undefined) { + return { type: "usage", conversationId, turnId, usage, stepId }; + } + return { type: "usage", conversationId, turnId, usage }; } export function turnStartEvent(conversationId: string, turnId: string): AgentEvent { - return { type: "turn-start", conversationId, turnId }; + return { type: "turn-start", conversationId, turnId }; } export function stepCompleteEvent( - conversationId: string, - turnId: string, - stepId: StepId, - timing?: { ttftMs?: number; decodeMs?: number; genTotalMs?: number }, + conversationId: string, + turnId: string, + stepId: StepId, + timing?: { ttftMs?: number; decodeMs?: number; genTotalMs?: number }, ): AgentEvent { - if (timing !== undefined) { - if (timing.ttftMs !== undefined) { - if (timing.decodeMs !== undefined && timing.genTotalMs !== undefined) { - return { - type: "step-complete", - conversationId, - turnId, - stepId, - ttftMs: timing.ttftMs, - decodeMs: timing.decodeMs, - genTotalMs: timing.genTotalMs, - }; - } - if (timing.genTotalMs !== undefined) { - return { - type: "step-complete", - conversationId, - turnId, - stepId, - ttftMs: timing.ttftMs, - genTotalMs: timing.genTotalMs, - }; - } - return { type: "step-complete", conversationId, turnId, stepId, ttftMs: timing.ttftMs }; - } - if (timing.genTotalMs !== undefined) { - return { - type: "step-complete", - conversationId, - turnId, - stepId, - genTotalMs: timing.genTotalMs, - }; - } - } - return { type: "step-complete", conversationId, turnId, stepId }; + if (timing !== undefined) { + if (timing.ttftMs !== undefined) { + if (timing.decodeMs !== undefined && timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + decodeMs: timing.decodeMs, + genTotalMs: timing.genTotalMs, + }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + ttftMs: timing.ttftMs, + genTotalMs: timing.genTotalMs, + }; + } + return { type: "step-complete", conversationId, turnId, stepId, ttftMs: timing.ttftMs }; + } + if (timing.genTotalMs !== undefined) { + return { + type: "step-complete", + conversationId, + turnId, + stepId, + genTotalMs: timing.genTotalMs, + }; + } + } + return { type: "step-complete", conversationId, turnId, stepId }; } export function doneEvent( - conversationId: string, - turnId: string, - reason: string, - durationMs?: number, - usage?: Usage, - contextSize?: number, + conversationId: string, + turnId: string, + reason: string, + durationMs?: number, + usage?: Usage, + contextSize?: number, ): AgentEvent { - if (durationMs !== undefined && usage !== undefined && contextSize !== undefined) { - return { type: "done", conversationId, turnId, reason, durationMs, usage, contextSize }; - } - if (durationMs !== undefined && usage !== undefined) { - return { type: "done", conversationId, turnId, reason, durationMs, usage }; - } - if (durationMs !== undefined && contextSize !== undefined) { - return { type: "done", conversationId, turnId, reason, durationMs, contextSize }; - } - if (usage !== undefined && contextSize !== undefined) { - return { type: "done", conversationId, turnId, reason, usage, contextSize }; - } - if (durationMs !== undefined) { - return { type: "done", conversationId, turnId, reason, durationMs }; - } - if (usage !== undefined) { - return { type: "done", conversationId, turnId, reason, usage }; - } - if (contextSize !== undefined) { - return { type: "done", conversationId, turnId, reason, contextSize }; - } - return { type: "done", conversationId, turnId, reason }; + if (durationMs !== undefined && usage !== undefined && contextSize !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs, usage, contextSize }; + } + if (durationMs !== undefined && usage !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs, usage }; + } + if (durationMs !== undefined && contextSize !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs, contextSize }; + } + if (usage !== undefined && contextSize !== undefined) { + return { type: "done", conversationId, turnId, reason, usage, contextSize }; + } + if (durationMs !== undefined) { + return { type: "done", conversationId, turnId, reason, durationMs }; + } + if (usage !== undefined) { + return { type: "done", conversationId, turnId, reason, usage }; + } + if (contextSize !== undefined) { + return { type: "done", conversationId, turnId, reason, contextSize }; + } + return { type: "done", conversationId, turnId, reason }; } export function errorEvent( - conversationId: string, - turnId: string, - message: string, - code?: string, + conversationId: string, + turnId: string, + message: string, + code?: string, ): AgentEvent { - if (code !== undefined) { - return { type: "error", conversationId, turnId, message, code }; - } - return { type: "error", conversationId, turnId, message }; + if (code !== undefined) { + return { type: "error", conversationId, turnId, message, code }; + } + return { type: "error", conversationId, turnId, message }; } export function providerRetryEvent( - conversationId: string, - turnId: string, - attempt: number, - delayMs: number, - message: string, - code?: string, + conversationId: string, + turnId: string, + attempt: number, + delayMs: number, + message: string, + code?: string, ): AgentEvent { - if (code !== undefined) { - return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message, code }; - } - return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message }; + if (code !== undefined) { + return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message, code }; + } + return { type: "provider-retry", conversationId, turnId, attempt, delayMs, message }; } diff --git a/packages/kernel/src/runtime/index.ts b/packages/kernel/src/runtime/index.ts index e0dd656..ecb802e 100644 --- a/packages/kernel/src/runtime/index.ts +++ b/packages/kernel/src/runtime/index.ts @@ -1,13 +1,13 @@ export type { StepDispatcher } from "./dispatch.js"; export { createStepDispatcher, executeToolCall } from "./dispatch.js"; export { - errorEvent, - providerRetryEvent, - reasoningDeltaEvent, - textDeltaEvent, - toolCallEvent, - toolOutputEvent, - toolResultEvent, - usageEvent, + errorEvent, + providerRetryEvent, + reasoningDeltaEvent, + textDeltaEvent, + toolCallEvent, + toolOutputEvent, + toolResultEvent, + usageEvent, } from "./events.js"; export { MAX_STEPS, runTurn } from "./run-turn.js"; diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 59e7fab..8d20975 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -8,3429 +8,3429 @@ import { createLogger } from "../logging/logger.js"; import { runTurn } from "./run-turn.js"; function delay(ms: number): Promise<void> { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); } function createFakeProvider(script: ProviderEvent[][]): ProviderContract { - let callIndex = 0; - return { - id: "fake", - stream(_messages, _tools) { - const events = script[callIndex] ?? []; - callIndex++; - return (async function* () { - for (const event of events) { - yield event; - } - })(); - }, - }; + let callIndex = 0; + return { + id: "fake", + stream(_messages, _tools) { + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) { + yield event; + } + })(); + }, + }; } function createCapturingProvider(script: ProviderEvent[][]): { - provider: ProviderContract; - capturedMessages: ChatMessage[][]; + provider: ProviderContract; + capturedMessages: ChatMessage[][]; } { - const capturedMessages: ChatMessage[][] = []; - let callIndex = 0; - const provider: ProviderContract = { - id: "fake", - stream(messages, _tools) { - capturedMessages.push([...messages]); - const events = script[callIndex] ?? []; - callIndex++; - return (async function* () { - for (const event of events) { - yield event; - } - })(); - }, - }; - return { provider, capturedMessages }; + const capturedMessages: ChatMessage[][] = []; + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream(messages, _tools) { + capturedMessages.push([...messages]); + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) { + yield event; + } + })(); + }, + }; + return { provider, capturedMessages }; } function createFakeTool( - name: string, - handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>, - opts?: { concurrencySafe?: boolean }, + name: string, + handler?: (input: unknown, ctx: ToolExecuteContext) => Promise<ToolResult>, + opts?: { concurrencySafe?: boolean }, ): ToolContract { - return { - name, - description: `Fake tool: ${name}`, - parameters: { type: "object" }, - ...(opts?.concurrencySafe !== undefined ? { concurrencySafe: opts.concurrencySafe } : {}), - execute: handler ?? (async (input) => ({ content: `${name}: ${JSON.stringify(input)}` })), - }; + return { + name, + description: `Fake tool: ${name}`, + parameters: { type: "object" }, + ...(opts?.concurrencySafe !== undefined ? { concurrencySafe: opts.concurrencySafe } : {}), + execute: handler ?? (async (input) => ({ content: `${name}: ${JSON.stringify(input)}` })), + }; } function createCollectingEmit(): { events: AgentEvent[]; emit: (event: AgentEvent) => void } { - const events: AgentEvent[] = []; - return { events, emit: (event) => events.push(event) }; + const events: AgentEvent[] = []; + return { events, emit: (event) => events.push(event) }; } const userMessage: ChatMessage = { - role: "user", - chunks: [{ type: "text", text: "hello" }], + role: "user", + chunks: [{ type: "text", text: "hello" }], }; describe("runTurn", () => { - it("emits events with the conversationId and turnId from input", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-42", - turnId: "turn-99", - emit, - }); - - expect(events.length).toBeGreaterThan(0); - for (const event of events) { - expect(event.conversationId).toBe("conv-42"); - if (event.type !== "status") { - expect(event.turnId).toBe("turn-99"); - } - } - }); - - it("text-only turn emits correct events and returns correct result", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "Hello" }, - { type: "text-delta", delta: " world" }, - { type: "reasoning-delta", delta: "thinking..." }, - { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit, - }); - - expect(result.finishReason).toBe("stop"); - expect(result.messages).toHaveLength(1); - expect(result.messages[0]?.role).toBe("assistant"); - - const chunks = result.messages[0]?.chunks ?? []; - expect(chunks).toHaveLength(2); - expect(chunks[0]).toEqual({ type: "text", text: "Hello world" }); - expect(chunks[1]).toEqual({ type: "thinking", text: "thinking..." }); - - expect(result.usage).toEqual({ inputTokens: 10, outputTokens: 5 }); - - const eventTypes = events.map((e) => e.type); - expect(eventTypes).toEqual([ - "turn-start", - "text-delta", - "text-delta", - "reasoning-delta", - "usage", - "step-complete", - "done", - ]); - }); - - it("turn with one tool call executes tool, feeds result back, then finishes", async () => { - const tool = createFakeTool("greet", async (input) => ({ - content: `Hello, ${(input as { name: string }).name}!`, - })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "greet", input: { name: "World" } }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "Done." }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit, - }); - - expect(result.finishReason).toBe("stop"); - expect(result.messages).toHaveLength(3); - expect(result.messages[0]?.role).toBe("assistant"); - expect(result.messages[1]?.role).toBe("tool"); - expect(result.messages[2]?.role).toBe("assistant"); - - const toolResultChunk = result.messages[1]?.chunks[0]; - expect(toolResultChunk?.type).toBe("tool-result"); - if (toolResultChunk?.type === "tool-result") { - expect(toolResultChunk.content).toBe("Hello, World!"); - expect(toolResultChunk.toolCallId).toBe("tc1"); - expect(toolResultChunk.isError).toBe(false); - } - - const eventTypes = events.map((e) => e.type); - expect(eventTypes).toContain("tool-call"); - expect(eventTypes).toContain("tool-result"); - expect(eventTypes).toContain("text-delta"); - }); - - it("passes updated messages to subsequent provider calls", async () => { - const capturedMessages: ChatMessage[][] = []; - let callIndex = 0; - const script: ProviderEvent[][] = [ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]; - - const provider: ProviderContract = { - id: "fake", - stream(messages, _tools) { - capturedMessages.push([...messages]); - const events = script[callIndex] ?? []; - callIndex++; - return (async function* () { - for (const event of events) yield event; - })(); - }, - }; - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - expect(capturedMessages).toHaveLength(2); - expect(capturedMessages[0] ?? []).toHaveLength(1); - expect(capturedMessages[0]?.[0]?.role).toBe("user"); - - expect(capturedMessages[1] ?? []).toHaveLength(3); - expect(capturedMessages[1]?.[0]?.role).toBe("user"); - expect(capturedMessages[1]?.[1]?.role).toBe("assistant"); - expect(capturedMessages[1]?.[2]?.role).toBe("tool"); - }); - - it("maxConcurrent: 1 runs tools sequentially", async () => { - const log: string[] = []; - - const toolA = createFakeTool("a", async () => { - log.push("a:start"); - await delay(10); - log.push("a:end"); - return { content: "a" }; - }); - - const toolB = createFakeTool("b", async () => { - log.push("b:start"); - await delay(10); - log.push("b:end"); - return { content: "b" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, - { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [toolA, toolB], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - const aEndIdx = log.indexOf("a:end"); - const bStartIdx = log.indexOf("b:start"); - expect(aEndIdx).toBeLessThan(bStartIdx); - }); - - it("maxConcurrent: 2 runs tools in parallel", async () => { - const log: string[] = []; - - const toolA = createFakeTool("a", async () => { - log.push("a:start"); - await delay(20); - log.push("a:end"); - return { content: "a" }; - }); - - const toolB = createFakeTool("b", async () => { - log.push("b:start"); - await delay(20); - log.push("b:end"); - return { content: "b" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, - { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [toolA, toolB], - dispatch: { maxConcurrent: 2, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - const aStartIdx = log.indexOf("a:start"); - const bStartIdx = log.indexOf("b:start"); - const aEndIdx = log.indexOf("a:end"); - const bEndIdx = log.indexOf("b:end"); - - expect(aStartIdx).toBeLessThan(aEndIdx); - expect(bStartIdx).toBeLessThan(bEndIdx); - expect(aStartIdx).toBeLessThan(bEndIdx); - expect(bStartIdx).toBeLessThan(aEndIdx); - }); - - it("maxConcurrent: 0 runs all tools in parallel (unlimited)", async () => { - const log: string[] = []; - - const toolA = createFakeTool("a", async () => { - log.push("a:start"); - await delay(20); - log.push("a:end"); - return { content: "a" }; - }); - - const toolB = createFakeTool("b", async () => { - log.push("b:start"); - await delay(20); - log.push("b:end"); - return { content: "b" }; - }); - - const toolC = createFakeTool("c", async () => { - log.push("c:start"); - await delay(20); - log.push("c:end"); - return { content: "c" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, - { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, - { type: "tool-call", toolCallId: "tc3", toolName: "c", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [toolA, toolB, toolC], - dispatch: { maxConcurrent: 0, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - const aStartIdx = log.indexOf("a:start"); - const bStartIdx = log.indexOf("b:start"); - const cStartIdx = log.indexOf("c:start"); - const aEndIdx = log.indexOf("a:end"); - const bEndIdx = log.indexOf("b:end"); - const cEndIdx = log.indexOf("c:end"); - - expect(aStartIdx).toBeLessThan(aEndIdx); - expect(bStartIdx).toBeLessThan(bEndIdx); - expect(cStartIdx).toBeLessThan(cEndIdx); - expect(aStartIdx).toBeLessThan(bEndIdx); - expect(bStartIdx).toBeLessThan(aEndIdx); - expect(cStartIdx).toBeLessThan(aEndIdx); - }); - - it("eager: true launches tool before step finish", async () => { - const log: string[] = []; - - const tool = createFakeTool("test", async () => { - log.push("tool:start"); - await delay(5); - log.push("tool:end"); - return { content: "done" }; - }); - - let callCount = 0; - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools) { - const idx = callCount++; - if (idx === 0) { - return (async function* () { - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "test", - input: {}, - } as ProviderEvent; - log.push("provider:after-tool-call"); - await delay(50); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - log.push("provider:finish"); - })(); - } - return (async function* () { - yield { type: "text-delta", delta: "done" } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: true }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - const toolStartIdx = log.indexOf("tool:start"); - const finishIdx = log.indexOf("provider:finish"); - expect(toolStartIdx).toBeLessThan(finishIdx); - }); - - it("eager: false does not launch tool before step finish", async () => { - const log: string[] = []; - - const tool = createFakeTool("test", async () => { - log.push("tool:start"); - await delay(5); - log.push("tool:end"); - return { content: "done" }; - }); - - let callCount = 0; - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools) { - const idx = callCount++; - if (idx === 0) { - return (async function* () { - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "test", - input: {}, - } as ProviderEvent; - log.push("provider:after-tool-call"); - await delay(50); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - log.push("provider:finish"); - })(); - } - return (async function* () { - yield { type: "text-delta", delta: "done" } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - const toolStartIdx = log.indexOf("tool:start"); - const finishIdx = log.indexOf("provider:finish"); - expect(toolStartIdx).toBeGreaterThan(finishIdx); - }); - - it("abort mid-turn synthesizes error results for unresolved tool calls", async () => { - const ac = new AbortController(); - - const tool = createFakeTool("slow", async (_input, ctx) => { - await delay(200); - if (ctx.signal.aborted) return { content: "Aborted", isError: true }; - return { content: "done" }; - }); - - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools) { - return (async function* () { - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "slow", - input: {}, - } as ProviderEvent; - yield { - type: "tool-call", - toolCallId: "tc2", - toolName: "slow", - input: { x: 1 }, - } as ProviderEvent; - ac.abort(); - await delay(10); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit, - signal: ac.signal, - }); - - expect(result.finishReason).toBe("aborted"); - - const toolResults = events.filter((e) => e.type === "tool-result"); - for (const tr of toolResults) { - if (tr.type === "tool-result") { - expect(tr.isError).toBe(true); - } - } - }); - - it("abort before any step returns aborted immediately", async () => { - const ac = new AbortController(); - ac.abort(); - - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "should not appear" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - signal: ac.signal, - }); - - expect(result.finishReason).toBe("aborted"); - expect(result.messages).toHaveLength(0); - }); - - it("de-duplicates identical tool calls in a batch", async () => { - let execCount = 0; - - const tool = createFakeTool("dedup", async (_input) => { - execCount++; - return { content: `result-${execCount}` }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "dedup", input: { x: 1 } }, - { type: "tool-call", toolCallId: "tc2", toolName: "dedup", input: { x: 1 } }, - { type: "tool-call", toolCallId: "tc3", toolName: "dedup", input: { x: 2 } }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit, - }); - - expect(execCount).toBe(2); - - const toolResults = events.filter((e) => e.type === "tool-result"); - expect(toolResults).toHaveLength(3); - - const tc1Result = toolResults.find((e) => e.type === "tool-result" && e.toolCallId === "tc1"); - const tc2Result = toolResults.find((e) => e.type === "tool-result" && e.toolCallId === "tc2"); - const tc3Result = toolResults.find((e) => e.type === "tool-result" && e.toolCallId === "tc3"); - - expect(tc1Result).toBeDefined(); - expect(tc2Result).toBeDefined(); - expect(tc3Result).toBeDefined(); - - if (tc1Result?.type === "tool-result" && tc2Result?.type === "tool-result") { - expect(tc1Result.content).toBe(tc2Result.content); - expect(tc1Result.content).toBe("result-1"); - } - if (tc3Result?.type === "tool-result") { - expect(tc3Result.content).toBe("result-2"); - } - - expect(result.finishReason).toBe("stop"); - }); - - it("serializes non-concurrency-safe tools even with maxConcurrent > 1", async () => { - const log: string[] = []; - - const unsafeTool: ToolContract = { - name: "unsafe", - description: "Unsafe tool", - parameters: { type: "object" }, - concurrencySafe: false, - execute: async () => { - log.push("unsafe:start"); - await delay(10); - log.push("unsafe:end"); - return { content: "done" }; - }, - }; - - const safeTool: ToolContract = { - name: "safe", - description: "Safe tool", - parameters: { type: "object" }, - execute: async () => { - log.push("safe:start"); - await delay(10); - log.push("safe:end"); - return { content: "done" }; - }, - }; - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "unsafe", input: {} }, - { type: "tool-call", toolCallId: "tc2", toolName: "safe", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [unsafeTool, safeTool], - dispatch: { maxConcurrent: 5, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - const unsafeEndIdx = log.indexOf("unsafe:end"); - const safeStartIdx = log.indexOf("safe:start"); - expect(unsafeEndIdx).toBeLessThan(safeStartIdx); - }); - - it("handles unknown tool name gracefully", async () => { - const provider = createFakeProvider([ - [ - { - type: "tool-call", - toolCallId: "tc1", - toolName: "nonexistent", - input: {}, - }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit, - }); - - const toolResults = events.filter((e) => e.type === "tool-result"); - expect(toolResults).toHaveLength(1); - if (toolResults[0]?.type === "tool-result") { - expect(toolResults[0]?.isError).toBe(true); - expect(toolResults[0]?.content).toContain("Unknown tool"); - } - - expect(result.finishReason).toBe("stop"); - }); - - it("handles provider error gracefully", async () => { - const provider: ProviderContract = { - id: "fake", - stream() { - return (async function* () { - yield { type: "text-delta", delta: "partial" } as ProviderEvent; - throw new Error("provider crashed"); - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit, - }); - - expect(result.finishReason).toBe("error"); - - const errorEvents = events.filter((e) => e.type === "error"); - expect(errorEvents).toHaveLength(1); - if (errorEvents[0]?.type === "error") { - expect(errorEvents[0]?.message).toContain("provider crashed"); - } - }); - - it("forwards cwd from RunTurnInput to ToolExecuteContext", async () => { - let capturedCwd: string | undefined = "SENTINEL_NOT_SET"; - - const tool = createFakeTool("cwdcheck", async (_input, ctx) => { - capturedCwd = ctx.cwd; - return { content: "ok" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "cwdcheck", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - cwd: "/some/dir", - }); - - expect(capturedCwd).toBe("/some/dir"); - }); - - it("forwards undefined cwd when RunTurnInput has no cwd", async () => { - let capturedCwd: string | undefined = "SENTINEL_NOT_SET"; - - const tool = createFakeTool("cwdcheck", async (_input, ctx) => { - capturedCwd = ctx.cwd; - return { content: "ok" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "cwdcheck", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - expect(capturedCwd).toBeUndefined(); - }); - - it("forwards computerId from RunTurnInput to ToolExecuteContext", async () => { - let capturedComputerId: string | undefined = "SENTINEL_NOT_SET"; - - const tool = createFakeTool("computercheck", async (_input, ctx) => { - capturedComputerId = ctx.computerId; - return { content: "ok" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "computercheck", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - computerId: "ssh-host-alias", - }); - - expect(capturedComputerId).toBe("ssh-host-alias"); - }); - - it("forwards undefined computerId when RunTurnInput has no computerId", async () => { - let capturedComputerId: string | undefined = "SENTINEL_NOT_SET"; - - const tool = createFakeTool("computercheck", async (_input, ctx) => { - capturedComputerId = ctx.computerId; - return { content: "ok" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "computercheck", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - expect(capturedComputerId).toBeUndefined(); - }); - - it("aggregates usage across multiple steps", async () => { - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "usage", usage: { inputTokens: 20, outputTokens: 10 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit: () => {}, - }); - - expect(result.usage).toEqual({ inputTokens: 30, outputTokens: 15 }); - }); - - it("emits tool-output events from tool ctx.onOutput", async () => { - const tool: ToolContract = { - name: "streaming", - description: "A tool that streams output", - parameters: { type: "object" }, - execute: async (_input, ctx) => { - ctx.onOutput("line 1\n", "stdout"); - ctx.onOutput("err 1\n", "stderr"); - return { content: "done" }; - }, - }; - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "streaming", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "tab-test", - turnId: "turn-test", - emit, - }); - - const outputs = events.filter((e) => e.type === "tool-output"); - expect(outputs).toHaveLength(2); - if (outputs[0]?.type === "tool-output") { - expect(outputs[0]?.data).toBe("line 1\n"); - expect(outputs[0]?.stream).toBe("stdout"); - expect(outputs[0]?.toolCallId).toBe("tc1"); - } - if (outputs[1]?.type === "tool-output") { - expect(outputs[1]?.data).toBe("err 1\n"); - expect(outputs[1]?.stream).toBe("stderr"); - } - }); - - function createTestLogger(): { - logger: Logger; - sink: LogSink & { records: LogRecord[] }; - deps: LogDeps; - } { - let idCounter = 0; - const deps: LogDeps = { - now: () => 1000 + idCounter * 100, - newId: () => `span-${++idCounter}`, - }; - const records: LogRecord[] = []; - const sink: LogSink & { records: LogRecord[] } = { - records, - emit: (record) => records.push(record), - }; - const logger = createLogger({ extensionId: "test" }, sink, deps); - return { logger, sink, deps }; - } - - describe("span instrumentation", () => { - it("emits turn + step span open/close in order", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const spanOpens = sink.records.filter((r) => r.kind === "span-open"); - const spanCloses = sink.records.filter((r) => r.kind === "span-close"); - - expect(spanOpens.length).toBeGreaterThanOrEqual(2); // turn + step - expect(spanCloses.length).toBeGreaterThanOrEqual(2); - - const turnOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "turn"); - const stepOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "step"); - expect(turnOpen).toBeDefined(); - expect(stepOpen).toBeDefined(); - - if (turnOpen?.kind === "span-open") { - expect(turnOpen.extensionId).toBe("test"); - expect(turnOpen.attributes?.conversationId).toBe("conv-1"); - expect(turnOpen.attributes?.turnId).toBe("turn-1"); - } - - const turnClose = spanCloses.find((r) => r.kind === "span-close" && r.name === "turn"); - expect(turnClose).toBeDefined(); - if (turnClose?.kind === "span-close") { - expect(turnClose.status).toBe("ok"); - expect(turnClose.durationMs).toBeGreaterThanOrEqual(0); - } - }); - - it("emits tool-call spans for dispatched tools", async () => { - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const toolCallSpans = sink.records.filter( - (r) => r.kind === "span-open" && r.name === "tool-call", - ); - expect(toolCallSpans).toHaveLength(1); - if (toolCallSpans[0]?.kind === "span-open") { - expect(toolCallSpans[0].attributes?.name).toBe("echo"); - expect(toolCallSpans[0].attributes?.toolCallId).toBe("tc1"); - } - - const toolCallCloses = sink.records.filter( - (r) => r.kind === "span-close" && r.name === "tool-call", - ); - expect(toolCallCloses).toHaveLength(1); - if (toolCallCloses[0]?.kind === "span-close") { - expect(toolCallCloses[0].status).toBe("ok"); - } - }); - - it("tools receive ctx.log (correlated logger)", async () => { - let capturedLog: Logger | undefined; - - const tool = createFakeTool("logtest", async (_input, ctx) => { - capturedLog = ctx.log; - ctx.log.info("tool ran", { key: "value" }); - return { content: "ok" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "logtest", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - expect(capturedLog).toBeDefined(); - - const toolLogs = sink.records.filter( - (r) => r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "tool ran", - ); - expect(toolLogs).toHaveLength(1); - if (toolLogs[0]?.kind === "log") { - expect(toolLogs[0].attributes?.key).toBe("value"); - expect(toolLogs[0].extensionId).toBe("test"); - } - }); - - it("an aborted turn still closes its turn span", async () => { - const ac = new AbortController(); - ac.abort(); - - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "should not appear" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - signal: ac.signal, - logger, - }); - - const turnCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "turn"); - expect(turnCloses).toHaveLength(1); - if (turnCloses[0]?.kind === "span-close") { - expect(turnCloses[0].attributes?.finishReason).toBe("aborted"); - } - }); - - it("a provider error closes the step span with error status", async () => { - const provider: ProviderContract = { - id: "fake", - stream() { - return (async function* () { - yield { type: "text-delta", delta: "partial" } as ProviderEvent; - throw new Error("provider exploded"); - })(); - }, - }; - - const { logger, sink } = createTestLogger(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - expect(result.finishReason).toBe("error"); - - const stepCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "step"); - expect(stepCloses).toHaveLength(1); - if (stepCloses[0]?.kind === "span-close") { - expect(stepCloses[0].status).toBe("error"); - expect(stepCloses[0].attributes?.["error.message"]).toContain("provider exploded"); - } - }); - - it("emits a prompt span with verbatim body and small scalar attributes", async () => { - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const promptOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "prompt"); - expect(promptOpens).toHaveLength(1); - - const promptOpen = promptOpens[0]; - if (promptOpen?.kind === "span-open") { - expect(promptOpen.body).toBeDefined(); - const parsed = JSON.parse(promptOpen.body as string); - expect(parsed.messages).toEqual([userMessage]); - expect(parsed.tools).toHaveLength(1); - expect(parsed.tools[0].name).toBe("echo"); - - expect(promptOpen.attributes?.messageCount).toBe(1); - expect(promptOpen.attributes?.toolCount).toBe(1); - } - - const promptCloses = sink.records.filter( - (r) => r.kind === "span-close" && r.name === "prompt", - ); - expect(promptCloses).toHaveLength(1); - - const logRecords = sink.records.filter( - (r) => - r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "prompt:before", - ); - expect(logRecords).toHaveLength(0); - }); - - it("emits ttft and decode spans for a generating step", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "Hello" }, - { type: "text-delta", delta: " world" }, - { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft"); - const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); - const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode"); - const decodeCloses = sink.records.filter( - (r) => r.kind === "span-close" && r.name === "decode", - ); - - expect(ttftOpens).toHaveLength(1); - expect(ttftCloses).toHaveLength(1); - expect(decodeOpens).toHaveLength(1); - expect(decodeCloses).toHaveLength(1); - - const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); - expect(stepOpen).toBeDefined(); - - if ( - ttftOpens[0]?.kind === "span-open" && - ttftCloses[0]?.kind === "span-close" && - decodeOpens[0]?.kind === "span-open" && - decodeCloses[0]?.kind === "span-close" && - stepOpen?.kind === "span-open" - ) { - // ttft and decode are children of step - expect(ttftOpens[0].parentSpanId).toBe(stepOpen.spanId); - expect(decodeOpens[0].parentSpanId).toBe(stepOpen.spanId); - - // ttft closes before decode opens (in order) - const ttftCloseIdx = sink.records.indexOf(ttftCloses[0]); - const decodeOpenIdx = sink.records.indexOf(decodeOpens[0]); - expect(ttftCloseIdx).toBeLessThan(decodeOpenIdx); - - // ttft has firstToken: true - expect(ttftCloses[0].attributes?.firstToken).toBe(true); - - // durations from fake clock - expect(ttftCloses[0].durationMs).toBeGreaterThanOrEqual(0); - expect(decodeCloses[0].durationMs).toBeGreaterThanOrEqual(0); - } - }); - - it("first token counts a reasoning delta", async () => { - const provider = createFakeProvider([ - [ - { type: "reasoning-delta", delta: "thinking..." }, - { type: "text-delta", delta: "Hello" }, - { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); - expect(ttftCloses).toHaveLength(1); - - // The ttft span should close at the reasoning delta, not at the text delta - if (ttftCloses[0]?.kind === "span-close") { - expect(ttftCloses[0].attributes?.firstToken).toBe(true); - } - }); - - it("a step with no content token does not emit a misleading decode", async () => { - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - // First step (tool-call-only) should have ttft with firstToken: false and no decode - const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft"); - const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); - const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode"); - - // There should be 2 ttft opens (one per step) and 2 ttft closes - expect(ttftOpens).toHaveLength(2); - expect(ttftCloses).toHaveLength(2); - - // First step: tool-call-only, no first token - if (ttftCloses[0]?.kind === "span-close") { - expect(ttftCloses[0].attributes?.firstToken).toBe(false); - } - - // Second step: has text-delta, should have firstToken: true and decode span - if (ttftCloses[1]?.kind === "span-close") { - expect(ttftCloses[1].attributes?.firstToken).toBe(true); - } - - // Only one decode span (for the second step) - expect(decodeOpens).toHaveLength(1); - }); - - it("turn span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); - expect(turnClose).toBeDefined(); - if (turnClose?.kind === "span-close") { - expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); - expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); - expect(turnClose.attributes?.usage_inputTokens).toBeUndefined(); - expect(turnClose.attributes?.usage_outputTokens).toBeUndefined(); - } - }); - - it("step span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "usage", usage: { inputTokens: 7, outputTokens: 3 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); - expect(stepClose).toBeDefined(); - if (stepClose?.kind === "span-close") { - expect(stepClose.attributes?.["usage.inputTokens"]).toBe(7); - expect(stepClose.attributes?.["usage.outputTokens"]).toBe(3); - expect(stepClose.attributes?.usage_inputTokens).toBeUndefined(); - expect(stepClose.attributes?.usage_outputTokens).toBeUndefined(); - } - }); - - it("turn + step spans stamp usage.cacheReadTokens / usage.cacheWriteTokens when the provider Usage carries them", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { - type: "usage", - usage: { inputTokens: 10, outputTokens: 5, cacheReadTokens: 3, cacheWriteTokens: 2 }, - }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); - const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); - - expect(turnClose).toBeDefined(); - if (turnClose?.kind === "span-close") { - expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); - expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); - expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBe(3); - expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBe(2); - } - - expect(stepClose).toBeDefined(); - if (stepClose?.kind === "span-close") { - expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10); - expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5); - expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBe(3); - expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBe(2); - } - }); - - it("turn + step spans OMIT the cache-token attrs when the provider Usage lacks them", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); - const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); - - expect(turnClose).toBeDefined(); - if (turnClose?.kind === "span-close") { - expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); - expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); - expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined(); - expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined(); - } - - expect(stepClose).toBeDefined(); - if (stepClose?.kind === "span-close") { - expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10); - expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5); - expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined(); - expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined(); - } - }); - }); - - describe("provider logger threading", () => { - it("passes step span logger to provider.stream opts when logger provided", async () => { - let capturedOpts: Record<string, unknown> | undefined; - - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools, opts) { - capturedOpts = opts !== undefined ? { ...opts } : undefined; - return (async function* () { - yield { type: "text-delta", delta: "hi" } as ProviderEvent; - yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - - const { logger } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - expect(capturedOpts).toBeDefined(); - expect(capturedOpts?.logger).toBeDefined(); - expect(typeof (capturedOpts?.logger as Record<string, unknown>).info).toBe("function"); - expect(typeof (capturedOpts?.logger as Record<string, unknown>).span).toBe("function"); - }); - - it("passes undefined for opts.logger when no logger provided", async () => { - let capturedOpts: Record<string, unknown> | undefined; - - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools, opts) { - capturedOpts = opts !== undefined ? { ...opts } : undefined; - return (async function* () { - yield { type: "text-delta", delta: "hi" } as ProviderEvent; - yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - }); - - expect(capturedOpts).toBeDefined(); - expect(capturedOpts?.logger).toBeUndefined(); - }); - - it("threads providerOpts.model through to provider.stream opts", async () => { - let capturedOpts: Record<string, unknown> | undefined; - - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools, opts) { - capturedOpts = opts !== undefined ? { ...opts } : undefined; - return (async function* () { - yield { type: "text-delta", delta: "hi" } as ProviderEvent; - yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - providerOpts: { model: "some-model-id" }, - }); - - expect(capturedOpts?.model).toBe("some-model-id"); - }); - }); - - describe("span tree nesting", () => { - it("turn span is root (parentSpanId undefined)", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const turnOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "turn"); - expect(turnOpen).toBeDefined(); - if (turnOpen?.kind === "span-open") { - expect(turnOpen.parentSpanId).toBeUndefined(); - } - }); - - it("step span is a child of turn span", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const turnOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "turn"); - const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); - expect(turnOpen).toBeDefined(); - expect(stepOpen).toBeDefined(); - if (turnOpen?.kind === "span-open" && stepOpen?.kind === "span-open") { - expect(stepOpen.parentSpanId).toBe(turnOpen.spanId); - } - }); - - it("prompt span is a child of step span", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); - const promptOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "prompt"); - expect(stepOpen).toBeDefined(); - expect(promptOpen).toBeDefined(); - if (stepOpen?.kind === "span-open" && promptOpen?.kind === "span-open") { - expect(promptOpen.parentSpanId).toBe(stepOpen.spanId); - } - }); - - it("provider logger creates spans nested under step", async () => { - let capturedLogger: Logger | undefined; - let providerReqSpanId: string | undefined; - - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools, opts) { - capturedLogger = opts?.logger; - return (async function* () { - // Open provider.request span inside the stream (like a real provider) - if (capturedLogger !== undefined) { - const span = capturedLogger.span("provider.request"); - providerReqSpanId = span.id; - span.end(); - } - yield { type: "text-delta", delta: "hi" } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - expect(capturedLogger).toBeDefined(); - expect(providerReqSpanId).toBeDefined(); - - const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); - const provReqOpen = sink.records.find( - (r) => r.kind === "span-open" && r.name === "provider.request", - ); - expect(stepOpen).toBeDefined(); - expect(provReqOpen).toBeDefined(); - if (stepOpen?.kind === "span-open" && provReqOpen?.kind === "span-open") { - expect(provReqOpen.parentSpanId).toBe(stepOpen.spanId); - expect(provReqOpen.spanId).toBe(providerReqSpanId); - } - }); - - it("tool-call spans are children of step span", async () => { - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); - const tcOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "tool-call"); - expect(stepOpen).toBeDefined(); - expect(tcOpen).toBeDefined(); - if (stepOpen?.kind === "span-open" && tcOpen?.kind === "span-open") { - expect(tcOpen.parentSpanId).toBe(stepOpen.spanId); - } - }); - - it("full parent chain: turn → step → {prompt, provider.request, tool-call}", async () => { - let capturedLogger: Logger | undefined; - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - let streamCallCount = 0; - const provider: ProviderContract = { - id: "fake", - stream(_messages, _tools, opts) { - capturedLogger = opts?.logger; - streamCallCount++; - return (async function* () { - // Simulate provider opening a provider.request span - // INSIDE the stream on the first call only (like a real provider) - if (streamCallCount === 1 && capturedLogger !== undefined) { - const span = capturedLogger.span("provider.request"); - span.end(); - } - if (streamCallCount === 1) { - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "echo", - input: {}, - } as ProviderEvent; - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - } else { - yield { type: "text-delta", delta: "done" } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - } - })(); - }, - }; - - const { logger, sink } = createTestLogger(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - logger, - }); - - const spanOpens = sink.records.filter((r) => r.kind === "span-open") as Array< - Extract<LogRecord, { kind: "span-open" }> - >; - - const turnOpen = spanOpens.find((r) => r.name === "turn"); - const stepOpen = spanOpens.find((r) => r.name === "step"); - const promptOpen = spanOpens.find((r) => r.name === "prompt"); - const provReqOpen = spanOpens.find((r) => r.name === "provider.request"); - const tcOpen = spanOpens.find((r) => r.name === "tool-call"); - - expect(turnOpen).toBeDefined(); - expect(stepOpen).toBeDefined(); - expect(promptOpen).toBeDefined(); - expect(provReqOpen).toBeDefined(); - expect(tcOpen).toBeDefined(); - - if ( - turnOpen?.kind === "span-open" && - stepOpen?.kind === "span-open" && - promptOpen?.kind === "span-open" && - provReqOpen?.kind === "span-open" && - tcOpen?.kind === "span-open" - ) { - // turn = root - expect(turnOpen.parentSpanId).toBeUndefined(); - - // step = child of turn - expect(stepOpen.parentSpanId).toBe(turnOpen.spanId); - - // prompt = child of step - expect(promptOpen.parentSpanId).toBe(stepOpen.spanId); - - // provider.request = child of step - expect(provReqOpen.parentSpanId).toBe(stepOpen.spanId); - - // tool-call = child of step - expect(tcOpen.parentSpanId).toBe(stepOpen.spanId); - } - }); - }); - - describe("lifecycle events", () => { - it("emits turn-start as the first event with conversation + turn ids", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-42", - turnId: "turn-99", - emit, - }); - - expect(events[0]?.type).toBe("turn-start"); - if (events[0]?.type === "turn-start") { - expect(events[0].conversationId).toBe("conv-42"); - expect(events[0].turnId).toBe("turn-99"); - } - }); - - it("emits a single done event last, carrying the finishReason", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "Hello" }, - { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const lastEvent = events[events.length - 1]; - expect(lastEvent?.type).toBe("done"); - if (lastEvent?.type === "done") { - expect(lastEvent.reason).toBe(result.finishReason); - expect(lastEvent.conversationId).toBe("conv-1"); - expect(lastEvent.turnId).toBe("turn-1"); - } - - const doneEvents = events.filter((e) => e.type === "done"); - expect(doneEvents).toHaveLength(1); - }); - - it("emits done after a tool-call turn", async () => { - const tool = createFakeTool("echo", async (input) => ({ - content: `echo: ${JSON.stringify(input)}`, - })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: { x: 1 } }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const lastEvent = events[events.length - 1]; - expect(lastEvent?.type).toBe("done"); - if (lastEvent?.type === "done") { - expect(lastEvent.reason).toBe(result.finishReason); - } - }); - - it('still emits done with reason "aborted" when the turn is aborted via signal', async () => { - const ac = new AbortController(); - ac.abort(); - - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "should not appear" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: ac.signal, - }); - - expect(result.finishReason).toBe("aborted"); - - const lastEvent = events[events.length - 1]; - expect(lastEvent?.type).toBe("done"); - if (lastEvent?.type === "done") { - expect(lastEvent.reason).toBe("aborted"); - } - }); - - it('still emits done with reason "error" when the provider errors', async () => { - const provider: ProviderContract = { - id: "fake", - stream() { - return (async function* () { - yield { type: "text-delta", delta: "partial" } as ProviderEvent; - throw new Error("provider crashed"); - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - expect(result.finishReason).toBe("error"); - - const lastEvent = events[events.length - 1]; - expect(lastEvent?.type).toBe("done"); - if (lastEvent?.type === "done") { - expect(lastEvent.reason).toBe("error"); - } - }); - - it("turn-start precedes every delta and done follows every delta", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "Hello" }, - { type: "reasoning-delta", delta: "thinking..." }, - { type: "text-delta", delta: " world" }, - { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const turnStartIdx = events.findIndex((e) => e.type === "turn-start"); - const doneIdx = events.findIndex((e) => e.type === "done"); - - expect(turnStartIdx).toBe(0); - expect(doneIdx).toBe(events.length - 1); - - for (let i = 0; i < events.length; i++) { - const e = events[i]; - if (e?.type === "text-delta" || e?.type === "reasoning-delta") { - expect(i).toBeGreaterThan(turnStartIdx); - expect(i).toBeLessThan(doneIdx); - } - } - }); - }); - - describe("stepId", () => { - it("tool-call and tool-result events carry stepId", async () => { - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const toolCallEvt = events.find((e) => e.type === "tool-call"); - const toolResultEvt = events.find((e) => e.type === "tool-result"); - - expect(toolCallEvt).toBeDefined(); - expect(toolResultEvt).toBeDefined(); - - if (toolCallEvt?.type === "tool-call" && toolResultEvt?.type === "tool-result") { - expect(toolCallEvt.stepId).toBeDefined(); - expect(toolResultEvt.stepId).toBeDefined(); - expect(toolCallEvt.stepId).toBe(toolResultEvt.stepId); - } - }); - - it("tool calls in the SAME step share one stepId; a later step gets a different one", async () => { - const toolA = createFakeTool("a", async () => ({ content: "a-result" })); - const toolB = createFakeTool("b", async () => ({ content: "b-result" })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, - { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "tool-call", toolCallId: "tc3", toolName: "a", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [toolA, toolB], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const toolCallEvts = events.filter((e) => e.type === "tool-call"); - expect(toolCallEvts.length).toBeGreaterThanOrEqual(2); - - const step0Calls = toolCallEvts.filter( - (e) => e.type === "tool-call" && (e.toolCallId === "tc1" || e.toolCallId === "tc2"), - ); - const step1Call = toolCallEvts.find((e) => e.type === "tool-call" && e.toolCallId === "tc3"); - - expect(step0Calls).toHaveLength(2); - if (step0Calls[0]?.type === "tool-call" && step0Calls[1]?.type === "tool-call") { - expect(step0Calls[0].stepId).toBe(step0Calls[1].stepId); - } - - if (step1Call?.type === "tool-call" && step0Calls[0]?.type === "tool-call") { - expect(step1Call.stepId).not.toBe(step0Calls[0].stepId); - } - }); - - it("tool chunks in the result carry stepId", async () => { - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - }); - - const toolCallMsg = result.messages.find( - (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"), - ); - const toolResultMsg = result.messages.find((m) => m.role === "tool"); - - expect(toolCallMsg).toBeDefined(); - expect(toolResultMsg).toBeDefined(); - - const tcChunk = toolCallMsg?.chunks.find((c) => c.type === "tool-call"); - const trChunk = toolResultMsg?.chunks[0]; - - expect(tcChunk?.type).toBe("tool-call"); - expect(trChunk?.type).toBe("tool-result"); - - if (tcChunk?.type === "tool-call" && trChunk?.type === "tool-result") { - expect(tcChunk.stepId).toBeDefined(); - expect(trChunk.stepId).toBeDefined(); - expect(tcChunk.stepId).toBe(trChunk.stepId); - } - }); - }); - - describe("timing events (now provided)", () => { - function createCounterNow(): { now: () => number; tick: (ms: number) => void } { - let current = 0; - return { - now: () => current, - tick: (ms: number) => { - current += ms; - }, - }; - } - - it("emits step-complete per step with timing when now provided", async () => { - const clock = createCounterNow(); - clock.tick(100); // turn starts at 100 - - const { events, emit } = createCollectingEmit(); - - // Advance clock during stream: first token at +50ms, stream ends at +200ms - let streamCallCount = 0; - const wrappedProvider: ProviderContract = { - id: "fake", - stream(_messages, _tools) { - const idx = streamCallCount++; - return (async function* () { - if (idx === 0) { - clock.tick(50); // stream starts - yield { type: "text-delta", delta: "Hello" } as ProviderEvent; - // first token seen at 150 (100+50) - clock.tick(100); - yield { type: "text-delta", delta: " world" } as ProviderEvent; - clock.tick(50); - yield { - type: "usage", - usage: { inputTokens: 10, outputTokens: 5 }, - } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - } - })(); - }, - }; - - await runTurn({ - provider: wrappedProvider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - now: clock.now, - }); - - const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); - expect(stepCompleteEvts).toHaveLength(1); - - const sc = stepCompleteEvts[0]; - if (sc?.type === "step-complete") { - expect(sc.conversationId).toBe("conv-1"); - expect(sc.turnId).toBe("turn-1"); - expect(sc.stepId).toBeDefined(); - expect(sc.genTotalMs).toBe(200); // 50+100+50 - expect(sc.ttftMs).toBe(50); // stream start → first text-delta - expect(sc.decodeMs).toBe(150); // first token → stream end - const ttft = sc.ttftMs; - const decode = sc.decodeMs; - const genTotal = sc.genTotalMs; - if (ttft !== undefined && decode !== undefined && genTotal !== undefined) { - expect(genTotal).toBe(ttft + decode); - } - } - }); - - it("step-complete omits ttft/decode but keeps genTotalMs for a no-content step", async () => { - const clock = createCounterNow(); - clock.tick(100); // turn starts at 100 - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - let streamCallCount = 0; - const wrappedProvider: ProviderContract = { - id: "fake", - stream(_messages, _tools) { - const idx = streamCallCount++; - return (async function* () { - if (idx === 0) { - clock.tick(80); // stream starts at 180 - yield { - type: "tool-call", - toolCallId: "tc1", - toolName: "echo", - input: {}, - } as ProviderEvent; - clock.tick(20); - yield { type: "finish", reason: "tool-calls" } as ProviderEvent; - } else { - clock.tick(50); - yield { type: "text-delta", delta: "done" } as ProviderEvent; - clock.tick(50); - yield { type: "finish", reason: "stop" } as ProviderEvent; - } - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider: wrappedProvider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - now: clock.now, - }); - - const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); - expect(stepCompleteEvts).toHaveLength(2); - - // First step: tool-call-only, no content token - const sc0 = stepCompleteEvts[0]; - if (sc0?.type === "step-complete") { - expect(sc0.stepId).toBeDefined(); - expect(sc0.genTotalMs).toBe(100); // 80+20 - expect(sc0.ttftMs).toBeUndefined(); - expect(sc0.decodeMs).toBeUndefined(); - } - - // Second step: has text-delta - const sc1 = stepCompleteEvts[1]; - if (sc1?.type === "step-complete") { - expect(sc1.stepId).toBeDefined(); - expect(sc1.genTotalMs).toBe(100); // 50+50 - expect(sc1.ttftMs).toBe(50); - expect(sc1.decodeMs).toBe(50); - } - }); - - it("usage event carries stepId", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const usageEvts = events.filter((e) => e.type === "usage"); - expect(usageEvts).toHaveLength(1); - const ue = usageEvts[0]; - if (ue?.type === "usage") { - expect(ue.stepId).toBeDefined(); - } - }); - - it("tool-result carries durationMs (execution time) when now provided", async () => { - const clock = createCounterNow(); - clock.tick(100); // turn starts at 100 - - const tool = createFakeTool("slow", async () => { - clock.tick(200); // tool takes 200ms to execute - return { content: "done" }; - }); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "slow", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "ok" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - now: clock.now, - }); - - const toolResultEvts = events.filter((e) => e.type === "tool-result"); - expect(toolResultEvts).toHaveLength(1); - const tr = toolResultEvts[0]; - if (tr?.type === "tool-result") { - expect(tr.durationMs).toBeDefined(); - expect(tr.durationMs).toBe(200); - } - }); - - it("done carries durationMs and aggregate usage when now provided", async () => { - const clock = createCounterNow(); - clock.tick(100); // turn starts at 100 - - const wrappedProvider: ProviderContract = { - id: "fake", - stream(_messages, _tools) { - return (async function* () { - clock.tick(80); // stream duration - yield { type: "text-delta", delta: "hi" } as ProviderEvent; - yield { - type: "usage", - usage: { inputTokens: 10, outputTokens: 5 }, - } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider: wrappedProvider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - now: clock.now, - }); - - const doneEvts = events.filter((e) => e.type === "done"); - expect(doneEvts).toHaveLength(1); - const d = doneEvts[0]; - if (d?.type === "done") { - expect(d.durationMs).toBeDefined(); - expect(d.durationMs).toBeGreaterThan(0); - expect(d.usage).toBeDefined(); - if (d.usage !== undefined) { - expect(d.usage.inputTokens).toBe(10); - expect(d.usage.outputTokens).toBe(5); - } - } - }); - - it("no now → timing fields absent", async () => { - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hi" }, - { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - // no now - }); - - // step-complete still emitted (with stepId, no timing) - const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); - expect(stepCompleteEvts).toHaveLength(2); - for (const sc of stepCompleteEvts) { - if (sc?.type === "step-complete") { - expect(sc.stepId).toBeDefined(); - expect(sc.ttftMs).toBeUndefined(); - expect(sc.decodeMs).toBeUndefined(); - expect(sc.genTotalMs).toBeUndefined(); - } - } - - // usage still carries stepId - const usageEvts = events.filter((e) => e.type === "usage"); - for (const ue of usageEvts) { - if (ue?.type === "usage") { - expect(ue.stepId).toBeDefined(); - } - } - - // no durationMs on tool-result - const toolResultEvts = events.filter((e) => e.type === "tool-result"); - for (const tr of toolResultEvts) { - if (tr?.type === "tool-result") { - expect(tr.durationMs).toBeUndefined(); - } - } - - // no durationMs on done, but usage is present (independent of now) - const doneEvts = events.filter((e) => e.type === "done"); - expect(doneEvts).toHaveLength(1); - const d = doneEvts[0]; - if (d?.type === "done") { - expect(d.durationMs).toBeUndefined(); - expect(d.usage).toBeDefined(); - if (d.usage !== undefined) { - expect(d.usage.inputTokens).toBe(15); - expect(d.usage.outputTokens).toBe(8); - } - } - }); - }); - - describe("contextSize", () => { - it("single-step turn: contextSize equals step inputTokens + outputTokens", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "Hello" }, - { type: "usage", usage: { inputTokens: 100, outputTokens: 50 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const doneEvt = events.find((e) => e.type === "done"); - expect(doneEvt).toBeDefined(); - if (doneEvt?.type === "done") { - expect(doneEvt.contextSize).toBe(150); - } - }); - - it("multi-step turn: contextSize equals ONLY the last step's inputTokens + outputTokens", async () => { - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "usage", usage: { inputTokens: 100, outputTokens: 20 } }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "usage", usage: { inputTokens: 300, outputTokens: 80 } }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const doneEvt = events.find((e) => e.type === "done"); - expect(doneEvt).toBeDefined(); - if (doneEvt?.type === "done") { - expect(doneEvt.contextSize).toBe(380); - expect(doneEvt.usage).toBeDefined(); - if (doneEvt.usage !== undefined) { - expect(doneEvt.contextSize).not.toBe(doneEvt.usage.inputTokens); - } - } - }); - - it("no usage reported: contextSize is undefined", async () => { - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "Hello" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - }); - - const doneEvt = events.find((e) => e.type === "done"); - expect(doneEvt).toBeDefined(); - if (doneEvt?.type === "done") { - expect(doneEvt.contextSize).toBeUndefined(); - expect(doneEvt.usage).toBeUndefined(); - } - }); - }); - - describe("drainSteering", () => { - it("drainSteering called once at the tool-result boundary; returned messages appended to the next step's provider input (after tool results)", async () => { - let drainCallCount = 0; - const steeringMessage: ChatMessage = { - role: "user", - chunks: [{ type: "text", text: "steer!" }], - }; - - const { provider, capturedMessages } = createCapturingProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - drainSteering: () => { - drainCallCount++; - return [steeringMessage]; - }, - }); - - expect(drainCallCount).toBe(1); - // The provider was called twice (tool-call step, then text step). - expect(capturedMessages).toHaveLength(2); - const secondStepMessages = capturedMessages[1] ?? []; - // user, assistant(tool-call), tool-result, steering(user) — in order, - // steering appended AFTER the tool results, before the next call. - expect(secondStepMessages).toHaveLength(4); - expect(secondStepMessages[0]?.role).toBe("user"); - expect(secondStepMessages[1]?.role).toBe("assistant"); - expect(secondStepMessages[2]?.role).toBe("tool"); - expect(secondStepMessages[3]).toEqual(steeringMessage); - expect(secondStepMessages[3]?.role).toBe("user"); - // Steering is fed to the next provider call, NOT surfaced in the - // turn result — the caller owns the steering messages' lifecycle. - expect(result.messages).toHaveLength(3); - }); - - it("drainSteering omitted → no injection; turn byte-identical to before", async () => { - const { provider, capturedMessages } = createCapturingProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - // drainSteering omitted — must be a strict no-op. - }); - - expect(capturedMessages).toHaveLength(2); - const secondStepMessages = capturedMessages[1] ?? []; - // user, assistant(tool-call), tool-result — NO steering injected. - expect(secondStepMessages).toHaveLength(3); - expect(secondStepMessages[0]?.role).toBe("user"); - expect(secondStepMessages[1]?.role).toBe("assistant"); - expect(secondStepMessages[2]?.role).toBe("tool"); - expect(result.messages).toHaveLength(3); - }); - - it("drainSteering returns [] → no injection", async () => { - let drainCallCount = 0; - const { provider, capturedMessages } = createCapturingProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - drainSteering: () => { - drainCallCount++; - return []; - }, - }); - - // Called at the boundary, but returned nothing → no injection. - expect(drainCallCount).toBe(1); - expect(capturedMessages).toHaveLength(2); - const secondStepMessages = capturedMessages[1] ?? []; - expect(secondStepMessages).toHaveLength(3); - expect(secondStepMessages[2]?.role).toBe("tool"); - }); - - it("drainSteering NOT called when a step has no tool calls (text-only turn)", async () => { - let drainCallCount = 0; - const provider = createFakeProvider([ - [ - { type: "text-delta", delta: "hello" }, - { type: "finish", reason: "stop" }, - ], - ]); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - drainSteering: () => { - drainCallCount++; - return []; - }, - }); - - expect(drainCallCount).toBe(0); - }); - - it("multiple tool-call steps → drainSteering called once per tool-call step", async () => { - let drainCallCount = 0; - const provider = createFakeProvider([ - [ - { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "tool-call", toolCallId: "tc2", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ], - [ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ], - ]); - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - drainSteering: () => { - drainCallCount++; - return []; - }, - }); - - // Steps 0 and 1 each produced tool calls → drained once each. - // Step 2 (text-only) → no boundary → no drain. Total = 2. - expect(drainCallCount).toBe(2); - }); - - it("MAX_STEPS=0 (unlimited): turn runs past the old 50-step limit and drains at every tool-result boundary until the model stops naturally", async () => { - let drainCallCount = 0; - // 100 tool-call steps (past the old MAX_STEPS=50) + 1 text-only step - // to end the turn naturally. - const STEPS_WITH_TOOLS = 100; - const script: ProviderEvent[][] = []; - for (let i = 0; i < STEPS_WITH_TOOLS; i++) { - script.push([ - { type: "tool-call", toolCallId: "tc", toolName: "echo", input: {} }, - { type: "finish", reason: "tool-calls" }, - ]); - } - // Final step: text only, no tool calls → natural end. - script.push([ - { type: "text-delta", delta: "done" }, - { type: "finish", reason: "stop" }, - ]); - const provider = createFakeProvider(script); - - const tool = createFakeTool("echo", async () => ({ content: "echoed" })); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [tool], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit: () => {}, - drainSteering: () => { - drainCallCount++; - return []; - }, - }); - - // Turn ended naturally, NOT via max-steps. - expect(result.finishReason).toBe("stop"); - // Every tool-call step (0..99) is followed by a next step → each - // triggers a drain. The text-only step breaks before draining. - expect(drainCallCount).toBe(STEPS_WITH_TOOLS); - // All 101 steps produced messages (100 tool steps with assistant + - // tool messages, 1 text-only step with an assistant message). - expect(result.messages.length).toBe(STEPS_WITH_TOOLS * 2 + 1); - }); - }); - - // ── Retry with backoff ────────────────────────────────────────────────── - // - // PURE tests: a fake `sleep` (records calls, resolves instantly, can abort - // on a chosen call) + a pure `delayFor` (the canonical schedule + 8h budget). - // A stub `ProviderContract` whose `stream` yields a retryable error N times - // then a finish. ZERO mocks of `@dispatch/*` modules — effects injected. - - /** The canonical backoff schedule (matches the orchestrator's concrete strategy). */ - const RETRY_SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000]; - const RETRY_TAIL_MS = 1_800_000; // 30m - const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; // 8h - - /** Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). */ - function cumulativeSleepMs(attempt: number): number { - let sum = 0; - for (let i = 0; i <= attempt; i++) { - sum += i < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[i] : RETRY_TAIL_MS; - } - return sum; - } - - /** Pure, deterministic delay decision (no I/O, no clock). */ - function delayFor(attempt: number): number | undefined { - const delay = attempt < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[attempt] : RETRY_TAIL_MS; - if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop - return delay; - } - - /** The full schedule delayFor would emit (until budget exhausted). */ - function fullSchedule(): number[] { - const result: number[] = []; - let attempt = 0; - while (true) { - const delay = delayFor(attempt); - if (delay === undefined) break; - result.push(delay); - attempt++; - } - return result; - } - - /** - * Fake, controllable `sleep`: records every call's delay, resolves - * instantly (no real waiting), and can abort the controller on a chosen - * 1-based call index to simulate "abort during sleep". - */ - function createFakeSleep(controller: AbortController): { - sleep: (ms: number, signal: AbortSignal) => Promise<void>; - calls: number[]; - abortOnCall: (n: number) => void; - } { - const calls: number[] = []; - let abortAt: number | undefined; - const sleep = async (ms: number, _signal: AbortSignal): Promise<void> => { - calls.push(ms); - if (abortAt !== undefined && calls.length === abortAt) { - controller.abort(); - throw new Error("aborted"); - } - // Otherwise resolve instantly (no real waiting). - }; - return { - sleep, - calls, - abortOnCall: (n: number) => { - abortAt = n; - }, - }; - } - - /** A provider that yields a retryable error `errorCount` times, then success. */ - function createRetryingProvider(opts: { - errorCount: number; - error?: { message: string; code?: string; retryable?: boolean }; - success?: ProviderEvent[]; - }): { provider: ProviderContract; streamCalls: { value: number } } { - const streamCalls = { value: 0 }; - const error: ProviderEvent = { - type: "error", - message: opts.error?.message ?? "overloaded", - ...(opts.error?.code !== undefined ? { code: opts.error.code } : {}), - ...(opts.error?.retryable !== undefined ? { retryable: opts.error.retryable } : {}), - }; - const success = opts.success ?? [ - { type: "text-delta", delta: "hi" }, - { type: "finish", reason: "stop" }, - ]; - const provider: ProviderContract = { - id: "fake", - stream() { - const idx = streamCalls.value++; - return (async function* () { - if (idx < opts.errorCount) { - yield error; - return; - } - for (const event of success) yield event; - })(); - }, - }; - return { provider, streamCalls }; - } - - describe("retry with backoff", () => { - it("retries a retryable emitted error on schedule then succeeds", async () => { - const { provider } = createRetryingProvider({ - errorCount: 3, - error: { message: "HTTP 429: overloaded", code: "429", retryable: true }, - }); - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - expect(result.finishReason).toBe("stop"); - // 3 retries: 5s, 10s, 30s. - expect(fake.calls).toEqual([5_000, 10_000, 30_000]); - // 3 provider-retry events (one per sleep), then the successful text. - const retryEvents = events.filter((e) => e.type === "provider-retry"); - expect(retryEvents).toHaveLength(3); - if (retryEvents[0]?.type === "provider-retry") { - expect(retryEvents[0].attempt).toBe(0); - expect(retryEvents[0].delayMs).toBe(5_000); - expect(retryEvents[0].message).toBe("HTTP 429: overloaded"); - expect(retryEvents[0].code).toBe("429"); - expect(retryEvents[0].conversationId).toBe("conv-1"); - expect(retryEvents[0].turnId).toBe("turn-1"); - } - if (retryEvents[1]?.type === "provider-retry") { - expect(retryEvents[1].attempt).toBe(1); - expect(retryEvents[1].delayMs).toBe(10_000); - } - if (retryEvents[2]?.type === "provider-retry") { - expect(retryEvents[2].attempt).toBe(2); - expect(retryEvents[2].delayMs).toBe(30_000); - } - // The error was suppressed (no error event emitted — retry succeeded). - expect(events.filter((e) => e.type === "error")).toHaveLength(0); - // The successful content still streams. - const deltas = events.filter((e) => e.type === "text-delta"); - expect(deltas).toHaveLength(1); - }); - - it("sleep is called with the full schedule [5s,10s,30s,60s,5m,10m,15m,30m,30m…]", async () => { - // Provider errors forever → retries until budget exhausted → gives up. - const { provider } = createRetryingProvider({ - errorCount: Number.POSITIVE_INFINITY, - error: { message: "overloaded", code: "429", retryable: true }, - }); - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - // Budget exhausted → give up → error. - expect(result.finishReason).toBe("error"); - - // The sleep schedule matches the pure delayFor output exactly. - expect(fake.calls).toEqual(fullSchedule()); - - // Head of the schedule (the 8 stepped delays). - expect(fake.calls.slice(0, 8)).toEqual([ - 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000, - ]); - // Tail repeats 30m. - expect(fake.calls[8]).toBe(1_800_000); - expect(fake.calls.at(-1)).toBe(1_800_000); - - // 8h cumulative budget cap: head (3705s) + 13×30m = ~7h31m, then stop. - // 21 retries (attempts 0..20), then delayFor(21) → undefined → give up. - expect(fake.calls).toHaveLength(21); - const totalSlept = fake.calls.reduce((a, b) => a + b, 0); - expect(totalSlept).toBeLessThanOrEqual(RETRY_BUDGET_MS); - expect(totalSlept).toBe(3_705_000 + 13 * 1_800_000); // 27_105_000 - - // One provider-retry per sleep, plus a final error (give-up). - expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(21); - expect(events.filter((e) => e.type === "error")).toHaveLength(1); - const errEvt = events.find((e) => e.type === "error"); - if (errEvt?.type === "error") { - expect(errEvt.message).toBe("overloaded"); - expect(errEvt.code).toBe("429"); - } - }); - - it("does NOT retry after content was emitted (safety invariant)", async () => { - // Provider yields text (content) THEN a retryable error. Because content - // was emitted, retrying is unsafe (would duplicate partial output). - let callCount = 0; - const provider: ProviderContract = { - id: "fake", - stream() { - callCount++; - return (async function* () { - yield { type: "text-delta", delta: "partial" } as ProviderEvent; - yield { - type: "error", - message: "overloaded", - code: "429", - retryable: true, - } as ProviderEvent; - })(); - }, - }; - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - // No retries: stream called exactly once. - expect(callCount).toBe(1); - expect(fake.calls).toHaveLength(0); - // The error is emitted (give-up) and partial content preserved. - expect(result.finishReason).toBe("error"); - expect(events.filter((e) => e.type === "error")).toHaveLength(1); - expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); - expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); - }); - - it("does NOT retry a non-retryable emitted error (retryable: false)", async () => { - const { provider, streamCalls } = createRetryingProvider({ - errorCount: 1, - error: { message: "bad request", code: "400", retryable: false }, - }); - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - expect(streamCalls.value).toBe(1); // no retry - expect(fake.calls).toHaveLength(0); - expect(result.finishReason).toBe("error"); - expect(events.filter((e) => e.type === "error")).toHaveLength(1); - expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); - }); - - it("does NOT retry a non-retryable emitted error (retryable absent)", async () => { - const { provider, streamCalls } = createRetryingProvider({ - errorCount: 1, - error: { message: "bad request", code: "400" }, // no retryable field - }); - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - expect(streamCalls.value).toBe(1); // no retry - expect(fake.calls).toHaveLength(0); - expect(result.finishReason).toBe("error"); - expect(events.filter((e) => e.type === "error")).toHaveLength(1); - }); - - it("give-up emits the final error when budget is exhausted", async () => { - // Custom delayFor that allows exactly 1 retry then stops. - const shortDelayFor = (attempt: number): number | undefined => - attempt === 0 ? 100 : undefined; - const { provider } = createRetryingProvider({ - errorCount: Number.POSITIVE_INFINITY, - error: { message: "overloaded", code: "429", retryable: true }, - }); - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor: shortDelayFor, sleep: fake.sleep }, - }); - - expect(result.finishReason).toBe("error"); - expect(fake.calls).toEqual([100]); // one retry, then give up - // One provider-retry (attempt 0), then the final error. - expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(1); - const errs = events.filter((e) => e.type === "error"); - expect(errs).toHaveLength(1); - if (errs[0]?.type === "error") { - expect(errs[0].message).toBe("overloaded"); - expect(errs[0].code).toBe("429"); - } - }); - - it("abort during sleep seals the turn aborted", async () => { - const { provider } = createRetryingProvider({ - errorCount: Number.POSITIVE_INFINITY, - error: { message: "overloaded", code: "429", retryable: true }, - }); - const controller = new AbortController(); - const fake = createFakeSleep(controller); - fake.abortOnCall(2); // abort on the 2nd sleep - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - expect(result.finishReason).toBe("aborted"); - // Two sleeps attempted; the 2nd aborted. - expect(fake.calls).toHaveLength(2); - // No terminal error emitted (it was an abort, not a give-up). - expect(events.filter((e) => e.type === "error")).toHaveLength(0); - // One provider-retry before the aborted sleep (attempt 0). - const retries = events.filter((e) => e.type === "provider-retry"); - expect(retries).toHaveLength(2); - // The done event carries reason "aborted". - const done = events.find((e) => e.type === "done"); - if (done?.type === "done") { - expect(done.reason).toBe("aborted"); - } - }); - - it("omitting retry keeps the pre-retry behavior (backward-compatible)", async () => { - // A retryable error with no retry configured → ends the step as today. - const { provider, streamCalls } = createRetryingProvider({ - errorCount: 1, - error: { message: "overloaded", code: "429", retryable: true }, - }); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - // no retry field - }); - - expect(streamCalls.value).toBe(1); // no retry - expect(result.finishReason).toBe("error"); - expect(events.filter((e) => e.type === "error")).toHaveLength(1); - expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); - }); - - it("retries a THROWN error (retryable-by-default when pre-content)", async () => { - // A thrown error (no retryable flag) before content is retried. - let callCount = 0; - const provider: ProviderContract = { - id: "fake", - stream() { - callCount++; - return (async function* () { - if (callCount <= 2) { - throw new Error("network blip"); - } - yield { type: "text-delta", delta: "hi" } as ProviderEvent; - yield { type: "finish", reason: "stop" } as ProviderEvent; - })(); - }, - }; - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - expect(callCount).toBe(3); // 2 throws retried, 3rd succeeds - expect(fake.calls).toEqual([5_000, 10_000]); - expect(result.finishReason).toBe("stop"); - expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(2); - // Thrown errors have no code. - if (events[0]?.type === "provider-retry") { - expect(events[0].code).toBeUndefined(); - expect(events[0].message).toBe("network blip"); - } - expect(events.filter((e) => e.type === "error")).toHaveLength(0); - }); - - it("does NOT retry a thrown error after content was emitted", async () => { - let callCount = 0; - const provider: ProviderContract = { - id: "fake", - stream() { - callCount++; - return (async function* () { - yield { type: "text-delta", delta: "partial" } as ProviderEvent; - throw new Error("network blip"); - })(); - }, - }; - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - const result = await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - expect(callCount).toBe(1); - expect(fake.calls).toHaveLength(0); - expect(result.finishReason).toBe("error"); - expect(events.filter((e) => e.type === "error")).toHaveLength(1); - expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); - }); - - it("provider-retry events interleave correctly: error → retry-event → sleep → retry", async () => { - // Verify ordering: each provider-retry event comes BEFORE its sleep, - // and the successful content comes only after the last retry. - const { provider } = createRetryingProvider({ - errorCount: 2, - error: { message: "overloaded", code: "429", retryable: true }, - success: [ - { type: "text-delta", delta: "ok" }, - { type: "finish", reason: "stop" }, - ], - }); - const controller = new AbortController(); - const fake = createFakeSleep(controller); - - const { events, emit } = createCollectingEmit(); - - await runTurn({ - provider, - messages: [userMessage], - tools: [], - dispatch: { maxConcurrent: 1, eager: false }, - conversationId: "conv-1", - turnId: "turn-1", - emit, - signal: controller.signal, - retry: { delayFor, sleep: fake.sleep }, - }); - - const types = events.map((e) => e.type); - // turn-start, provider-retry(0), provider-retry(1), text-delta, step-complete, done - expect(types[0]).toBe("turn-start"); - const firstRetryIdx = types.indexOf("provider-retry"); - const textIdx = types.indexOf("text-delta"); - expect(firstRetryIdx).toBeGreaterThan(0); - expect(textIdx).toBeGreaterThan(firstRetryIdx); - // Both retries precede the text. - const retryCount = types.filter((t) => t === "provider-retry").length; - expect(retryCount).toBe(2); - }); - }); + it("emits events with the conversationId and turnId from input", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-42", + turnId: "turn-99", + emit, + }); + + expect(events.length).toBeGreaterThan(0); + for (const event of events) { + expect(event.conversationId).toBe("conv-42"); + if (event.type !== "status") { + expect(event.turnId).toBe("turn-99"); + } + } + }); + + it("text-only turn emits correct events and returns correct result", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "text-delta", delta: " world" }, + { type: "reasoning-delta", delta: "thinking..." }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit, + }); + + expect(result.finishReason).toBe("stop"); + expect(result.messages).toHaveLength(1); + expect(result.messages[0]?.role).toBe("assistant"); + + const chunks = result.messages[0]?.chunks ?? []; + expect(chunks).toHaveLength(2); + expect(chunks[0]).toEqual({ type: "text", text: "Hello world" }); + expect(chunks[1]).toEqual({ type: "thinking", text: "thinking..." }); + + expect(result.usage).toEqual({ inputTokens: 10, outputTokens: 5 }); + + const eventTypes = events.map((e) => e.type); + expect(eventTypes).toEqual([ + "turn-start", + "text-delta", + "text-delta", + "reasoning-delta", + "usage", + "step-complete", + "done", + ]); + }); + + it("turn with one tool call executes tool, feeds result back, then finishes", async () => { + const tool = createFakeTool("greet", async (input) => ({ + content: `Hello, ${(input as { name: string }).name}!`, + })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "greet", input: { name: "World" } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "Done." }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit, + }); + + expect(result.finishReason).toBe("stop"); + expect(result.messages).toHaveLength(3); + expect(result.messages[0]?.role).toBe("assistant"); + expect(result.messages[1]?.role).toBe("tool"); + expect(result.messages[2]?.role).toBe("assistant"); + + const toolResultChunk = result.messages[1]?.chunks[0]; + expect(toolResultChunk?.type).toBe("tool-result"); + if (toolResultChunk?.type === "tool-result") { + expect(toolResultChunk.content).toBe("Hello, World!"); + expect(toolResultChunk.toolCallId).toBe("tc1"); + expect(toolResultChunk.isError).toBe(false); + } + + const eventTypes = events.map((e) => e.type); + expect(eventTypes).toContain("tool-call"); + expect(eventTypes).toContain("tool-result"); + expect(eventTypes).toContain("text-delta"); + }); + + it("passes updated messages to subsequent provider calls", async () => { + const capturedMessages: ChatMessage[][] = []; + let callIndex = 0; + const script: ProviderEvent[][] = [ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]; + + const provider: ProviderContract = { + id: "fake", + stream(messages, _tools) { + capturedMessages.push([...messages]); + const events = script[callIndex] ?? []; + callIndex++; + return (async function* () { + for (const event of events) yield event; + })(); + }, + }; + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + expect(capturedMessages).toHaveLength(2); + expect(capturedMessages[0] ?? []).toHaveLength(1); + expect(capturedMessages[0]?.[0]?.role).toBe("user"); + + expect(capturedMessages[1] ?? []).toHaveLength(3); + expect(capturedMessages[1]?.[0]?.role).toBe("user"); + expect(capturedMessages[1]?.[1]?.role).toBe("assistant"); + expect(capturedMessages[1]?.[2]?.role).toBe("tool"); + }); + + it("maxConcurrent: 1 runs tools sequentially", async () => { + const log: string[] = []; + + const toolA = createFakeTool("a", async () => { + log.push("a:start"); + await delay(10); + log.push("a:end"); + return { content: "a" }; + }); + + const toolB = createFakeTool("b", async () => { + log.push("b:start"); + await delay(10); + log.push("b:end"); + return { content: "b" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, + { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [toolA, toolB], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + const aEndIdx = log.indexOf("a:end"); + const bStartIdx = log.indexOf("b:start"); + expect(aEndIdx).toBeLessThan(bStartIdx); + }); + + it("maxConcurrent: 2 runs tools in parallel", async () => { + const log: string[] = []; + + const toolA = createFakeTool("a", async () => { + log.push("a:start"); + await delay(20); + log.push("a:end"); + return { content: "a" }; + }); + + const toolB = createFakeTool("b", async () => { + log.push("b:start"); + await delay(20); + log.push("b:end"); + return { content: "b" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, + { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [toolA, toolB], + dispatch: { maxConcurrent: 2, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + const aStartIdx = log.indexOf("a:start"); + const bStartIdx = log.indexOf("b:start"); + const aEndIdx = log.indexOf("a:end"); + const bEndIdx = log.indexOf("b:end"); + + expect(aStartIdx).toBeLessThan(aEndIdx); + expect(bStartIdx).toBeLessThan(bEndIdx); + expect(aStartIdx).toBeLessThan(bEndIdx); + expect(bStartIdx).toBeLessThan(aEndIdx); + }); + + it("maxConcurrent: 0 runs all tools in parallel (unlimited)", async () => { + const log: string[] = []; + + const toolA = createFakeTool("a", async () => { + log.push("a:start"); + await delay(20); + log.push("a:end"); + return { content: "a" }; + }); + + const toolB = createFakeTool("b", async () => { + log.push("b:start"); + await delay(20); + log.push("b:end"); + return { content: "b" }; + }); + + const toolC = createFakeTool("c", async () => { + log.push("c:start"); + await delay(20); + log.push("c:end"); + return { content: "c" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, + { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, + { type: "tool-call", toolCallId: "tc3", toolName: "c", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [toolA, toolB, toolC], + dispatch: { maxConcurrent: 0, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + const aStartIdx = log.indexOf("a:start"); + const bStartIdx = log.indexOf("b:start"); + const cStartIdx = log.indexOf("c:start"); + const aEndIdx = log.indexOf("a:end"); + const bEndIdx = log.indexOf("b:end"); + const cEndIdx = log.indexOf("c:end"); + + expect(aStartIdx).toBeLessThan(aEndIdx); + expect(bStartIdx).toBeLessThan(bEndIdx); + expect(cStartIdx).toBeLessThan(cEndIdx); + expect(aStartIdx).toBeLessThan(bEndIdx); + expect(bStartIdx).toBeLessThan(aEndIdx); + expect(cStartIdx).toBeLessThan(aEndIdx); + }); + + it("eager: true launches tool before step finish", async () => { + const log: string[] = []; + + const tool = createFakeTool("test", async () => { + log.push("tool:start"); + await delay(5); + log.push("tool:end"); + return { content: "done" }; + }); + + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = callCount++; + if (idx === 0) { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "test", + input: {}, + } as ProviderEvent; + log.push("provider:after-tool-call"); + await delay(50); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + log.push("provider:finish"); + })(); + } + return (async function* () { + yield { type: "text-delta", delta: "done" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: true }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + const toolStartIdx = log.indexOf("tool:start"); + const finishIdx = log.indexOf("provider:finish"); + expect(toolStartIdx).toBeLessThan(finishIdx); + }); + + it("eager: false does not launch tool before step finish", async () => { + const log: string[] = []; + + const tool = createFakeTool("test", async () => { + log.push("tool:start"); + await delay(5); + log.push("tool:end"); + return { content: "done" }; + }); + + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = callCount++; + if (idx === 0) { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "test", + input: {}, + } as ProviderEvent; + log.push("provider:after-tool-call"); + await delay(50); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + log.push("provider:finish"); + })(); + } + return (async function* () { + yield { type: "text-delta", delta: "done" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + const toolStartIdx = log.indexOf("tool:start"); + const finishIdx = log.indexOf("provider:finish"); + expect(toolStartIdx).toBeGreaterThan(finishIdx); + }); + + it("abort mid-turn synthesizes error results for unresolved tool calls", async () => { + const ac = new AbortController(); + + const tool = createFakeTool("slow", async (_input, ctx) => { + await delay(200); + if (ctx.signal.aborted) return { content: "Aborted", isError: true }; + return { content: "done" }; + }); + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + return (async function* () { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "slow", + input: {}, + } as ProviderEvent; + yield { + type: "tool-call", + toolCallId: "tc2", + toolName: "slow", + input: { x: 1 }, + } as ProviderEvent; + ac.abort(); + await delay(10); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + const toolResults = events.filter((e) => e.type === "tool-result"); + for (const tr of toolResults) { + if (tr.type === "tool-result") { + expect(tr.isError).toBe(true); + } + } + }); + + it("abort before any step returns aborted immediately", async () => { + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + expect(result.messages).toHaveLength(0); + }); + + it("de-duplicates identical tool calls in a batch", async () => { + let execCount = 0; + + const tool = createFakeTool("dedup", async (_input) => { + execCount++; + return { content: `result-${execCount}` }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "dedup", input: { x: 1 } }, + { type: "tool-call", toolCallId: "tc2", toolName: "dedup", input: { x: 1 } }, + { type: "tool-call", toolCallId: "tc3", toolName: "dedup", input: { x: 2 } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit, + }); + + expect(execCount).toBe(2); + + const toolResults = events.filter((e) => e.type === "tool-result"); + expect(toolResults).toHaveLength(3); + + const tc1Result = toolResults.find((e) => e.type === "tool-result" && e.toolCallId === "tc1"); + const tc2Result = toolResults.find((e) => e.type === "tool-result" && e.toolCallId === "tc2"); + const tc3Result = toolResults.find((e) => e.type === "tool-result" && e.toolCallId === "tc3"); + + expect(tc1Result).toBeDefined(); + expect(tc2Result).toBeDefined(); + expect(tc3Result).toBeDefined(); + + if (tc1Result?.type === "tool-result" && tc2Result?.type === "tool-result") { + expect(tc1Result.content).toBe(tc2Result.content); + expect(tc1Result.content).toBe("result-1"); + } + if (tc3Result?.type === "tool-result") { + expect(tc3Result.content).toBe("result-2"); + } + + expect(result.finishReason).toBe("stop"); + }); + + it("serializes non-concurrency-safe tools even with maxConcurrent > 1", async () => { + const log: string[] = []; + + const unsafeTool: ToolContract = { + name: "unsafe", + description: "Unsafe tool", + parameters: { type: "object" }, + concurrencySafe: false, + execute: async () => { + log.push("unsafe:start"); + await delay(10); + log.push("unsafe:end"); + return { content: "done" }; + }, + }; + + const safeTool: ToolContract = { + name: "safe", + description: "Safe tool", + parameters: { type: "object" }, + execute: async () => { + log.push("safe:start"); + await delay(10); + log.push("safe:end"); + return { content: "done" }; + }, + }; + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "unsafe", input: {} }, + { type: "tool-call", toolCallId: "tc2", toolName: "safe", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [unsafeTool, safeTool], + dispatch: { maxConcurrent: 5, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + const unsafeEndIdx = log.indexOf("unsafe:end"); + const safeStartIdx = log.indexOf("safe:start"); + expect(unsafeEndIdx).toBeLessThan(safeStartIdx); + }); + + it("handles unknown tool name gracefully", async () => { + const provider = createFakeProvider([ + [ + { + type: "tool-call", + toolCallId: "tc1", + toolName: "nonexistent", + input: {}, + }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit, + }); + + const toolResults = events.filter((e) => e.type === "tool-result"); + expect(toolResults).toHaveLength(1); + if (toolResults[0]?.type === "tool-result") { + expect(toolResults[0]?.isError).toBe(true); + expect(toolResults[0]?.content).toContain("Unknown tool"); + } + + expect(result.finishReason).toBe("stop"); + }); + + it("handles provider error gracefully", async () => { + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("provider crashed"); + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit, + }); + + expect(result.finishReason).toBe("error"); + + const errorEvents = events.filter((e) => e.type === "error"); + expect(errorEvents).toHaveLength(1); + if (errorEvents[0]?.type === "error") { + expect(errorEvents[0]?.message).toContain("provider crashed"); + } + }); + + it("forwards cwd from RunTurnInput to ToolExecuteContext", async () => { + let capturedCwd: string | undefined = "SENTINEL_NOT_SET"; + + const tool = createFakeTool("cwdcheck", async (_input, ctx) => { + capturedCwd = ctx.cwd; + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "cwdcheck", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + cwd: "/some/dir", + }); + + expect(capturedCwd).toBe("/some/dir"); + }); + + it("forwards undefined cwd when RunTurnInput has no cwd", async () => { + let capturedCwd: string | undefined = "SENTINEL_NOT_SET"; + + const tool = createFakeTool("cwdcheck", async (_input, ctx) => { + capturedCwd = ctx.cwd; + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "cwdcheck", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + expect(capturedCwd).toBeUndefined(); + }); + + it("forwards computerId from RunTurnInput to ToolExecuteContext", async () => { + let capturedComputerId: string | undefined = "SENTINEL_NOT_SET"; + + const tool = createFakeTool("computercheck", async (_input, ctx) => { + capturedComputerId = ctx.computerId; + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "computercheck", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + computerId: "ssh-host-alias", + }); + + expect(capturedComputerId).toBe("ssh-host-alias"); + }); + + it("forwards undefined computerId when RunTurnInput has no computerId", async () => { + let capturedComputerId: string | undefined = "SENTINEL_NOT_SET"; + + const tool = createFakeTool("computercheck", async (_input, ctx) => { + capturedComputerId = ctx.computerId; + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "computercheck", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + expect(capturedComputerId).toBeUndefined(); + }); + + it("aggregates usage across multiple steps", async () => { + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "usage", usage: { inputTokens: 20, outputTokens: 10 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit: () => {}, + }); + + expect(result.usage).toEqual({ inputTokens: 30, outputTokens: 15 }); + }); + + it("emits tool-output events from tool ctx.onOutput", async () => { + const tool: ToolContract = { + name: "streaming", + description: "A tool that streams output", + parameters: { type: "object" }, + execute: async (_input, ctx) => { + ctx.onOutput("line 1\n", "stdout"); + ctx.onOutput("err 1\n", "stderr"); + return { content: "done" }; + }, + }; + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "streaming", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "tab-test", + turnId: "turn-test", + emit, + }); + + const outputs = events.filter((e) => e.type === "tool-output"); + expect(outputs).toHaveLength(2); + if (outputs[0]?.type === "tool-output") { + expect(outputs[0]?.data).toBe("line 1\n"); + expect(outputs[0]?.stream).toBe("stdout"); + expect(outputs[0]?.toolCallId).toBe("tc1"); + } + if (outputs[1]?.type === "tool-output") { + expect(outputs[1]?.data).toBe("err 1\n"); + expect(outputs[1]?.stream).toBe("stderr"); + } + }); + + function createTestLogger(): { + logger: Logger; + sink: LogSink & { records: LogRecord[] }; + deps: LogDeps; + } { + let idCounter = 0; + const deps: LogDeps = { + now: () => 1000 + idCounter * 100, + newId: () => `span-${++idCounter}`, + }; + const records: LogRecord[] = []; + const sink: LogSink & { records: LogRecord[] } = { + records, + emit: (record) => records.push(record), + }; + const logger = createLogger({ extensionId: "test" }, sink, deps); + return { logger, sink, deps }; + } + + describe("span instrumentation", () => { + it("emits turn + step span open/close in order", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const spanOpens = sink.records.filter((r) => r.kind === "span-open"); + const spanCloses = sink.records.filter((r) => r.kind === "span-close"); + + expect(spanOpens.length).toBeGreaterThanOrEqual(2); // turn + step + expect(spanCloses.length).toBeGreaterThanOrEqual(2); + + const turnOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "turn"); + const stepOpen = spanOpens.find((r) => r.kind === "span-open" && r.name === "step"); + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + + if (turnOpen?.kind === "span-open") { + expect(turnOpen.extensionId).toBe("test"); + expect(turnOpen.attributes?.conversationId).toBe("conv-1"); + expect(turnOpen.attributes?.turnId).toBe("turn-1"); + } + + const turnClose = spanCloses.find((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.status).toBe("ok"); + expect(turnClose.durationMs).toBeGreaterThanOrEqual(0); + } + }); + + it("emits tool-call spans for dispatched tools", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const toolCallSpans = sink.records.filter( + (r) => r.kind === "span-open" && r.name === "tool-call", + ); + expect(toolCallSpans).toHaveLength(1); + if (toolCallSpans[0]?.kind === "span-open") { + expect(toolCallSpans[0].attributes?.name).toBe("echo"); + expect(toolCallSpans[0].attributes?.toolCallId).toBe("tc1"); + } + + const toolCallCloses = sink.records.filter( + (r) => r.kind === "span-close" && r.name === "tool-call", + ); + expect(toolCallCloses).toHaveLength(1); + if (toolCallCloses[0]?.kind === "span-close") { + expect(toolCallCloses[0].status).toBe("ok"); + } + }); + + it("tools receive ctx.log (correlated logger)", async () => { + let capturedLog: Logger | undefined; + + const tool = createFakeTool("logtest", async (_input, ctx) => { + capturedLog = ctx.log; + ctx.log.info("tool ran", { key: "value" }); + return { content: "ok" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "logtest", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedLog).toBeDefined(); + + const toolLogs = sink.records.filter( + (r) => r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "tool ran", + ); + expect(toolLogs).toHaveLength(1); + if (toolLogs[0]?.kind === "log") { + expect(toolLogs[0].attributes?.key).toBe("value"); + expect(toolLogs[0].extensionId).toBe("test"); + } + }); + + it("an aborted turn still closes its turn span", async () => { + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + signal: ac.signal, + logger, + }); + + const turnCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnCloses).toHaveLength(1); + if (turnCloses[0]?.kind === "span-close") { + expect(turnCloses[0].attributes?.finishReason).toBe("aborted"); + } + }); + + it("a provider error closes the step span with error status", async () => { + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("provider exploded"); + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(result.finishReason).toBe("error"); + + const stepCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "step"); + expect(stepCloses).toHaveLength(1); + if (stepCloses[0]?.kind === "span-close") { + expect(stepCloses[0].status).toBe("error"); + expect(stepCloses[0].attributes?.["error.message"]).toContain("provider exploded"); + } + }); + + it("emits a prompt span with verbatim body and small scalar attributes", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const promptOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "prompt"); + expect(promptOpens).toHaveLength(1); + + const promptOpen = promptOpens[0]; + if (promptOpen?.kind === "span-open") { + expect(promptOpen.body).toBeDefined(); + const parsed = JSON.parse(promptOpen.body as string); + expect(parsed.messages).toEqual([userMessage]); + expect(parsed.tools).toHaveLength(1); + expect(parsed.tools[0].name).toBe("echo"); + + expect(promptOpen.attributes?.messageCount).toBe(1); + expect(promptOpen.attributes?.toolCount).toBe(1); + } + + const promptCloses = sink.records.filter( + (r) => r.kind === "span-close" && r.name === "prompt", + ); + expect(promptCloses).toHaveLength(1); + + const logRecords = sink.records.filter( + (r) => + r.kind === "log" && r.kind === "log" && (r as { msg: string }).msg === "prompt:before", + ); + expect(logRecords).toHaveLength(0); + }); + + it("emits ttft and decode spans for a generating step", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "text-delta", delta: " world" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft"); + const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); + const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode"); + const decodeCloses = sink.records.filter( + (r) => r.kind === "span-close" && r.name === "decode", + ); + + expect(ttftOpens).toHaveLength(1); + expect(ttftCloses).toHaveLength(1); + expect(decodeOpens).toHaveLength(1); + expect(decodeCloses).toHaveLength(1); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + expect(stepOpen).toBeDefined(); + + if ( + ttftOpens[0]?.kind === "span-open" && + ttftCloses[0]?.kind === "span-close" && + decodeOpens[0]?.kind === "span-open" && + decodeCloses[0]?.kind === "span-close" && + stepOpen?.kind === "span-open" + ) { + // ttft and decode are children of step + expect(ttftOpens[0].parentSpanId).toBe(stepOpen.spanId); + expect(decodeOpens[0].parentSpanId).toBe(stepOpen.spanId); + + // ttft closes before decode opens (in order) + const ttftCloseIdx = sink.records.indexOf(ttftCloses[0]); + const decodeOpenIdx = sink.records.indexOf(decodeOpens[0]); + expect(ttftCloseIdx).toBeLessThan(decodeOpenIdx); + + // ttft has firstToken: true + expect(ttftCloses[0].attributes?.firstToken).toBe(true); + + // durations from fake clock + expect(ttftCloses[0].durationMs).toBeGreaterThanOrEqual(0); + expect(decodeCloses[0].durationMs).toBeGreaterThanOrEqual(0); + } + }); + + it("first token counts a reasoning delta", async () => { + const provider = createFakeProvider([ + [ + { type: "reasoning-delta", delta: "thinking..." }, + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); + expect(ttftCloses).toHaveLength(1); + + // The ttft span should close at the reasoning delta, not at the text delta + if (ttftCloses[0]?.kind === "span-close") { + expect(ttftCloses[0].attributes?.firstToken).toBe(true); + } + }); + + it("a step with no content token does not emit a misleading decode", async () => { + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + // First step (tool-call-only) should have ttft with firstToken: false and no decode + const ttftOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "ttft"); + const ttftCloses = sink.records.filter((r) => r.kind === "span-close" && r.name === "ttft"); + const decodeOpens = sink.records.filter((r) => r.kind === "span-open" && r.name === "decode"); + + // There should be 2 ttft opens (one per step) and 2 ttft closes + expect(ttftOpens).toHaveLength(2); + expect(ttftCloses).toHaveLength(2); + + // First step: tool-call-only, no first token + if (ttftCloses[0]?.kind === "span-close") { + expect(ttftCloses[0].attributes?.firstToken).toBe(false); + } + + // Second step: has text-delta, should have firstToken: true and decode span + if (ttftCloses[1]?.kind === "span-close") { + expect(ttftCloses[1].attributes?.firstToken).toBe(true); + } + + // Only one decode span (for the second step) + expect(decodeOpens).toHaveLength(1); + }); + + it("turn span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(turnClose.attributes?.usage_inputTokens).toBeUndefined(); + expect(turnClose.attributes?.usage_outputTokens).toBeUndefined(); + } + }); + + it("step span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 7, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); + expect(stepClose).toBeDefined(); + if (stepClose?.kind === "span-close") { + expect(stepClose.attributes?.["usage.inputTokens"]).toBe(7); + expect(stepClose.attributes?.["usage.outputTokens"]).toBe(3); + expect(stepClose.attributes?.usage_inputTokens).toBeUndefined(); + expect(stepClose.attributes?.usage_outputTokens).toBeUndefined(); + } + }); + + it("turn + step spans stamp usage.cacheReadTokens / usage.cacheWriteTokens when the provider Usage carries them", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5, cacheReadTokens: 3, cacheWriteTokens: 2 }, + }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); + const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); + + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBe(3); + expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBe(2); + } + + expect(stepClose).toBeDefined(); + if (stepClose?.kind === "span-close") { + expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBe(3); + expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBe(2); + } + }); + + it("turn + step spans OMIT the cache-token attrs when the provider Usage lacks them", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); + const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); + + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined(); + expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined(); + } + + expect(stepClose).toBeDefined(); + if (stepClose?.kind === "span-close") { + expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined(); + expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined(); + } + }); + }); + + describe("provider logger threading", () => { + it("passes step span logger to provider.stream opts when logger provided", async () => { + let capturedOpts: Record<string, unknown> | undefined; + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedOpts = opts !== undefined ? { ...opts } : undefined; + return (async function* () { + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { logger } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedOpts).toBeDefined(); + expect(capturedOpts?.logger).toBeDefined(); + expect(typeof (capturedOpts?.logger as Record<string, unknown>).info).toBe("function"); + expect(typeof (capturedOpts?.logger as Record<string, unknown>).span).toBe("function"); + }); + + it("passes undefined for opts.logger when no logger provided", async () => { + let capturedOpts: Record<string, unknown> | undefined; + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedOpts = opts !== undefined ? { ...opts } : undefined; + return (async function* () { + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + }); + + expect(capturedOpts).toBeDefined(); + expect(capturedOpts?.logger).toBeUndefined(); + }); + + it("threads providerOpts.model through to provider.stream opts", async () => { + let capturedOpts: Record<string, unknown> | undefined; + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedOpts = opts !== undefined ? { ...opts } : undefined; + return (async function* () { + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "usage", usage: { inputTokens: 1, outputTokens: 1 } } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + providerOpts: { model: "some-model-id" }, + }); + + expect(capturedOpts?.model).toBe("some-model-id"); + }); + }); + + describe("span tree nesting", () => { + it("turn span is root (parentSpanId undefined)", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "turn"); + expect(turnOpen).toBeDefined(); + if (turnOpen?.kind === "span-open") { + expect(turnOpen.parentSpanId).toBeUndefined(); + } + }); + + it("step span is a child of turn span", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "turn"); + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + if (turnOpen?.kind === "span-open" && stepOpen?.kind === "span-open") { + expect(stepOpen.parentSpanId).toBe(turnOpen.spanId); + } + }); + + it("prompt span is a child of step span", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + const promptOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "prompt"); + expect(stepOpen).toBeDefined(); + expect(promptOpen).toBeDefined(); + if (stepOpen?.kind === "span-open" && promptOpen?.kind === "span-open") { + expect(promptOpen.parentSpanId).toBe(stepOpen.spanId); + } + }); + + it("provider logger creates spans nested under step", async () => { + let capturedLogger: Logger | undefined; + let providerReqSpanId: string | undefined; + + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedLogger = opts?.logger; + return (async function* () { + // Open provider.request span inside the stream (like a real provider) + if (capturedLogger !== undefined) { + const span = capturedLogger.span("provider.request"); + providerReqSpanId = span.id; + span.end(); + } + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + expect(capturedLogger).toBeDefined(); + expect(providerReqSpanId).toBeDefined(); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + const provReqOpen = sink.records.find( + (r) => r.kind === "span-open" && r.name === "provider.request", + ); + expect(stepOpen).toBeDefined(); + expect(provReqOpen).toBeDefined(); + if (stepOpen?.kind === "span-open" && provReqOpen?.kind === "span-open") { + expect(provReqOpen.parentSpanId).toBe(stepOpen.spanId); + expect(provReqOpen.spanId).toBe(providerReqSpanId); + } + }); + + it("tool-call spans are children of step span", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const stepOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "step"); + const tcOpen = sink.records.find((r) => r.kind === "span-open" && r.name === "tool-call"); + expect(stepOpen).toBeDefined(); + expect(tcOpen).toBeDefined(); + if (stepOpen?.kind === "span-open" && tcOpen?.kind === "span-open") { + expect(tcOpen.parentSpanId).toBe(stepOpen.spanId); + } + }); + + it("full parent chain: turn → step → {prompt, provider.request, tool-call}", async () => { + let capturedLogger: Logger | undefined; + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let streamCallCount = 0; + const provider: ProviderContract = { + id: "fake", + stream(_messages, _tools, opts) { + capturedLogger = opts?.logger; + streamCallCount++; + return (async function* () { + // Simulate provider opening a provider.request span + // INSIDE the stream on the first call only (like a real provider) + if (streamCallCount === 1 && capturedLogger !== undefined) { + const span = capturedLogger.span("provider.request"); + span.end(); + } + if (streamCallCount === 1) { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + yield { type: "text-delta", delta: "done" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const spanOpens = sink.records.filter((r) => r.kind === "span-open") as Array< + Extract<LogRecord, { kind: "span-open" }> + >; + + const turnOpen = spanOpens.find((r) => r.name === "turn"); + const stepOpen = spanOpens.find((r) => r.name === "step"); + const promptOpen = spanOpens.find((r) => r.name === "prompt"); + const provReqOpen = spanOpens.find((r) => r.name === "provider.request"); + const tcOpen = spanOpens.find((r) => r.name === "tool-call"); + + expect(turnOpen).toBeDefined(); + expect(stepOpen).toBeDefined(); + expect(promptOpen).toBeDefined(); + expect(provReqOpen).toBeDefined(); + expect(tcOpen).toBeDefined(); + + if ( + turnOpen?.kind === "span-open" && + stepOpen?.kind === "span-open" && + promptOpen?.kind === "span-open" && + provReqOpen?.kind === "span-open" && + tcOpen?.kind === "span-open" + ) { + // turn = root + expect(turnOpen.parentSpanId).toBeUndefined(); + + // step = child of turn + expect(stepOpen.parentSpanId).toBe(turnOpen.spanId); + + // prompt = child of step + expect(promptOpen.parentSpanId).toBe(stepOpen.spanId); + + // provider.request = child of step + expect(provReqOpen.parentSpanId).toBe(stepOpen.spanId); + + // tool-call = child of step + expect(tcOpen.parentSpanId).toBe(stepOpen.spanId); + } + }); + }); + + describe("lifecycle events", () => { + it("emits turn-start as the first event with conversation + turn ids", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-42", + turnId: "turn-99", + emit, + }); + + expect(events[0]?.type).toBe("turn-start"); + if (events[0]?.type === "turn-start") { + expect(events[0].conversationId).toBe("conv-42"); + expect(events[0].turnId).toBe("turn-99"); + } + }); + + it("emits a single done event last, carrying the finishReason", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe(result.finishReason); + expect(lastEvent.conversationId).toBe("conv-1"); + expect(lastEvent.turnId).toBe("turn-1"); + } + + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + }); + + it("emits done after a tool-call turn", async () => { + const tool = createFakeTool("echo", async (input) => ({ + content: `echo: ${JSON.stringify(input)}`, + })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: { x: 1 } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe(result.finishReason); + } + }); + + it('still emits done with reason "aborted" when the turn is aborted via signal', async () => { + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe("aborted"); + } + }); + + it('still emits done with reason "error" when the provider errors', async () => { + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("provider crashed"); + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + expect(result.finishReason).toBe("error"); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe("error"); + } + }); + + it("turn-start precedes every delta and done follows every delta", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "reasoning-delta", delta: "thinking..." }, + { type: "text-delta", delta: " world" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const turnStartIdx = events.findIndex((e) => e.type === "turn-start"); + const doneIdx = events.findIndex((e) => e.type === "done"); + + expect(turnStartIdx).toBe(0); + expect(doneIdx).toBe(events.length - 1); + + for (let i = 0; i < events.length; i++) { + const e = events[i]; + if (e?.type === "text-delta" || e?.type === "reasoning-delta") { + expect(i).toBeGreaterThan(turnStartIdx); + expect(i).toBeLessThan(doneIdx); + } + } + }); + }); + + describe("stepId", () => { + it("tool-call and tool-result events carry stepId", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const toolCallEvt = events.find((e) => e.type === "tool-call"); + const toolResultEvt = events.find((e) => e.type === "tool-result"); + + expect(toolCallEvt).toBeDefined(); + expect(toolResultEvt).toBeDefined(); + + if (toolCallEvt?.type === "tool-call" && toolResultEvt?.type === "tool-result") { + expect(toolCallEvt.stepId).toBeDefined(); + expect(toolResultEvt.stepId).toBeDefined(); + expect(toolCallEvt.stepId).toBe(toolResultEvt.stepId); + } + }); + + it("tool calls in the SAME step share one stepId; a later step gets a different one", async () => { + const toolA = createFakeTool("a", async () => ({ content: "a-result" })); + const toolB = createFakeTool("b", async () => ({ content: "b-result" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, + { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "tool-call", toolCallId: "tc3", toolName: "a", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [toolA, toolB], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const toolCallEvts = events.filter((e) => e.type === "tool-call"); + expect(toolCallEvts.length).toBeGreaterThanOrEqual(2); + + const step0Calls = toolCallEvts.filter( + (e) => e.type === "tool-call" && (e.toolCallId === "tc1" || e.toolCallId === "tc2"), + ); + const step1Call = toolCallEvts.find((e) => e.type === "tool-call" && e.toolCallId === "tc3"); + + expect(step0Calls).toHaveLength(2); + if (step0Calls[0]?.type === "tool-call" && step0Calls[1]?.type === "tool-call") { + expect(step0Calls[0].stepId).toBe(step0Calls[1].stepId); + } + + if (step1Call?.type === "tool-call" && step0Calls[0]?.type === "tool-call") { + expect(step1Call.stepId).not.toBe(step0Calls[0].stepId); + } + }); + + it("tool chunks in the result carry stepId", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + }); + + const toolCallMsg = result.messages.find( + (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"), + ); + const toolResultMsg = result.messages.find((m) => m.role === "tool"); + + expect(toolCallMsg).toBeDefined(); + expect(toolResultMsg).toBeDefined(); + + const tcChunk = toolCallMsg?.chunks.find((c) => c.type === "tool-call"); + const trChunk = toolResultMsg?.chunks[0]; + + expect(tcChunk?.type).toBe("tool-call"); + expect(trChunk?.type).toBe("tool-result"); + + if (tcChunk?.type === "tool-call" && trChunk?.type === "tool-result") { + expect(tcChunk.stepId).toBeDefined(); + expect(trChunk.stepId).toBeDefined(); + expect(tcChunk.stepId).toBe(trChunk.stepId); + } + }); + }); + + describe("timing events (now provided)", () => { + function createCounterNow(): { now: () => number; tick: (ms: number) => void } { + let current = 0; + return { + now: () => current, + tick: (ms: number) => { + current += ms; + }, + }; + } + + it("emits step-complete per step with timing when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const { events, emit } = createCollectingEmit(); + + // Advance clock during stream: first token at +50ms, stream ends at +200ms + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(50); // stream starts + yield { type: "text-delta", delta: "Hello" } as ProviderEvent; + // first token seen at 150 (100+50) + clock.tick(100); + yield { type: "text-delta", delta: " world" } as ProviderEvent; + clock.tick(50); + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(1); + + const sc = stepCompleteEvts[0]; + if (sc?.type === "step-complete") { + expect(sc.conversationId).toBe("conv-1"); + expect(sc.turnId).toBe("turn-1"); + expect(sc.stepId).toBeDefined(); + expect(sc.genTotalMs).toBe(200); // 50+100+50 + expect(sc.ttftMs).toBe(50); // stream start → first text-delta + expect(sc.decodeMs).toBe(150); // first token → stream end + const ttft = sc.ttftMs; + const decode = sc.decodeMs; + const genTotal = sc.genTotalMs; + if (ttft !== undefined && decode !== undefined && genTotal !== undefined) { + expect(genTotal).toBe(ttft + decode); + } + } + }); + + it("step-complete omits ttft/decode but keeps genTotalMs for a no-content step", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let streamCallCount = 0; + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + const idx = streamCallCount++; + return (async function* () { + if (idx === 0) { + clock.tick(80); // stream starts at 180 + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + clock.tick(20); + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + clock.tick(50); + yield { type: "text-delta", delta: "done" } as ProviderEvent; + clock.tick(50); + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + + // First step: tool-call-only, no content token + const sc0 = stepCompleteEvts[0]; + if (sc0?.type === "step-complete") { + expect(sc0.stepId).toBeDefined(); + expect(sc0.genTotalMs).toBe(100); // 80+20 + expect(sc0.ttftMs).toBeUndefined(); + expect(sc0.decodeMs).toBeUndefined(); + } + + // Second step: has text-delta + const sc1 = stepCompleteEvts[1]; + if (sc1?.type === "step-complete") { + expect(sc1.stepId).toBeDefined(); + expect(sc1.genTotalMs).toBe(100); // 50+50 + expect(sc1.ttftMs).toBe(50); + expect(sc1.decodeMs).toBe(50); + } + }); + + it("usage event carries stepId", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const usageEvts = events.filter((e) => e.type === "usage"); + expect(usageEvts).toHaveLength(1); + const ue = usageEvts[0]; + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + }); + + it("tool-result carries durationMs (execution time) when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const tool = createFakeTool("slow", async () => { + clock.tick(200); // tool takes 200ms to execute + return { content: "done" }; + }); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "slow", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + expect(toolResultEvts).toHaveLength(1); + const tr = toolResultEvts[0]; + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeDefined(); + expect(tr.durationMs).toBe(200); + } + }); + + it("done carries durationMs and aggregate usage when now provided", async () => { + const clock = createCounterNow(); + clock.tick(100); // turn starts at 100 + + const wrappedProvider: ProviderContract = { + id: "fake", + stream(_messages, _tools) { + return (async function* () { + clock.tick(80); // stream duration + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider: wrappedProvider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + now: clock.now, + }); + + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeDefined(); + expect(d.durationMs).toBeGreaterThan(0); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(10); + expect(d.usage.outputTokens).toBe(5); + } + } + }); + + it("no now → timing fields absent", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + // no now + }); + + // step-complete still emitted (with stepId, no timing) + const stepCompleteEvts = events.filter((e) => e.type === "step-complete"); + expect(stepCompleteEvts).toHaveLength(2); + for (const sc of stepCompleteEvts) { + if (sc?.type === "step-complete") { + expect(sc.stepId).toBeDefined(); + expect(sc.ttftMs).toBeUndefined(); + expect(sc.decodeMs).toBeUndefined(); + expect(sc.genTotalMs).toBeUndefined(); + } + } + + // usage still carries stepId + const usageEvts = events.filter((e) => e.type === "usage"); + for (const ue of usageEvts) { + if (ue?.type === "usage") { + expect(ue.stepId).toBeDefined(); + } + } + + // no durationMs on tool-result + const toolResultEvts = events.filter((e) => e.type === "tool-result"); + for (const tr of toolResultEvts) { + if (tr?.type === "tool-result") { + expect(tr.durationMs).toBeUndefined(); + } + } + + // no durationMs on done, but usage is present (independent of now) + const doneEvts = events.filter((e) => e.type === "done"); + expect(doneEvts).toHaveLength(1); + const d = doneEvts[0]; + if (d?.type === "done") { + expect(d.durationMs).toBeUndefined(); + expect(d.usage).toBeDefined(); + if (d.usage !== undefined) { + expect(d.usage.inputTokens).toBe(15); + expect(d.usage.outputTokens).toBe(8); + } + } + }); + }); + + describe("contextSize", () => { + it("single-step turn: contextSize equals step inputTokens + outputTokens", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 100, outputTokens: 50 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const doneEvt = events.find((e) => e.type === "done"); + expect(doneEvt).toBeDefined(); + if (doneEvt?.type === "done") { + expect(doneEvt.contextSize).toBe(150); + } + }); + + it("multi-step turn: contextSize equals ONLY the last step's inputTokens + outputTokens", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "usage", usage: { inputTokens: 100, outputTokens: 20 } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "usage", usage: { inputTokens: 300, outputTokens: 80 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const doneEvt = events.find((e) => e.type === "done"); + expect(doneEvt).toBeDefined(); + if (doneEvt?.type === "done") { + expect(doneEvt.contextSize).toBe(380); + expect(doneEvt.usage).toBeDefined(); + if (doneEvt.usage !== undefined) { + expect(doneEvt.contextSize).not.toBe(doneEvt.usage.inputTokens); + } + } + }); + + it("no usage reported: contextSize is undefined", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const doneEvt = events.find((e) => e.type === "done"); + expect(doneEvt).toBeDefined(); + if (doneEvt?.type === "done") { + expect(doneEvt.contextSize).toBeUndefined(); + expect(doneEvt.usage).toBeUndefined(); + } + }); + }); + + describe("drainSteering", () => { + it("drainSteering called once at the tool-result boundary; returned messages appended to the next step's provider input (after tool results)", async () => { + let drainCallCount = 0; + const steeringMessage: ChatMessage = { + role: "user", + chunks: [{ type: "text", text: "steer!" }], + }; + + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return [steeringMessage]; + }, + }); + + expect(drainCallCount).toBe(1); + // The provider was called twice (tool-call step, then text step). + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + // user, assistant(tool-call), tool-result, steering(user) — in order, + // steering appended AFTER the tool results, before the next call. + expect(secondStepMessages).toHaveLength(4); + expect(secondStepMessages[0]?.role).toBe("user"); + expect(secondStepMessages[1]?.role).toBe("assistant"); + expect(secondStepMessages[2]?.role).toBe("tool"); + expect(secondStepMessages[3]).toEqual(steeringMessage); + expect(secondStepMessages[3]?.role).toBe("user"); + // Steering is fed to the next provider call, NOT surfaced in the + // turn result — the caller owns the steering messages' lifecycle. + expect(result.messages).toHaveLength(3); + }); + + it("drainSteering omitted → no injection; turn byte-identical to before", async () => { + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + // drainSteering omitted — must be a strict no-op. + }); + + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + // user, assistant(tool-call), tool-result — NO steering injected. + expect(secondStepMessages).toHaveLength(3); + expect(secondStepMessages[0]?.role).toBe("user"); + expect(secondStepMessages[1]?.role).toBe("assistant"); + expect(secondStepMessages[2]?.role).toBe("tool"); + expect(result.messages).toHaveLength(3); + }); + + it("drainSteering returns [] → no injection", async () => { + let drainCallCount = 0; + const { provider, capturedMessages } = createCapturingProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + // Called at the boundary, but returned nothing → no injection. + expect(drainCallCount).toBe(1); + expect(capturedMessages).toHaveLength(2); + const secondStepMessages = capturedMessages[1] ?? []; + expect(secondStepMessages).toHaveLength(3); + expect(secondStepMessages[2]?.role).toBe("tool"); + }); + + it("drainSteering NOT called when a step has no tool calls (text-only turn)", async () => { + let drainCallCount = 0; + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hello" }, + { type: "finish", reason: "stop" }, + ], + ]); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + expect(drainCallCount).toBe(0); + }); + + it("multiple tool-call steps → drainSteering called once per tool-call step", async () => { + let drainCallCount = 0; + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "tool-call", toolCallId: "tc2", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + // Steps 0 and 1 each produced tool calls → drained once each. + // Step 2 (text-only) → no boundary → no drain. Total = 2. + expect(drainCallCount).toBe(2); + }); + + it("MAX_STEPS=0 (unlimited): turn runs past the old 50-step limit and drains at every tool-result boundary until the model stops naturally", async () => { + let drainCallCount = 0; + // 100 tool-call steps (past the old MAX_STEPS=50) + 1 text-only step + // to end the turn naturally. + const STEPS_WITH_TOOLS = 100; + const script: ProviderEvent[][] = []; + for (let i = 0; i < STEPS_WITH_TOOLS; i++) { + script.push([ + { type: "tool-call", toolCallId: "tc", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ]); + } + // Final step: text only, no tool calls → natural end. + script.push([ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ]); + const provider = createFakeProvider(script); + + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + drainSteering: () => { + drainCallCount++; + return []; + }, + }); + + // Turn ended naturally, NOT via max-steps. + expect(result.finishReason).toBe("stop"); + // Every tool-call step (0..99) is followed by a next step → each + // triggers a drain. The text-only step breaks before draining. + expect(drainCallCount).toBe(STEPS_WITH_TOOLS); + // All 101 steps produced messages (100 tool steps with assistant + + // tool messages, 1 text-only step with an assistant message). + expect(result.messages.length).toBe(STEPS_WITH_TOOLS * 2 + 1); + }); + }); + + // ── Retry with backoff ────────────────────────────────────────────────── + // + // PURE tests: a fake `sleep` (records calls, resolves instantly, can abort + // on a chosen call) + a pure `delayFor` (the canonical schedule + 8h budget). + // A stub `ProviderContract` whose `stream` yields a retryable error N times + // then a finish. ZERO mocks of `@dispatch/*` modules — effects injected. + + /** The canonical backoff schedule (matches the orchestrator's concrete strategy). */ + const RETRY_SCHEDULE_MS = [5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000]; + const RETRY_TAIL_MS = 1_800_000; // 30m + const RETRY_BUDGET_MS = 8 * 60 * 60 * 1000; // 8h + + /** Cumulative scheduled sleep through `attempt` (sum of delay[0..attempt]). */ + function cumulativeSleepMs(attempt: number): number { + let sum = 0; + for (let i = 0; i <= attempt; i++) { + sum += i < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[i] : RETRY_TAIL_MS; + } + return sum; + } + + /** Pure, deterministic delay decision (no I/O, no clock). */ + function delayFor(attempt: number): number | undefined { + const delay = attempt < RETRY_SCHEDULE_MS.length ? RETRY_SCHEDULE_MS[attempt] : RETRY_TAIL_MS; + if (cumulativeSleepMs(attempt) > RETRY_BUDGET_MS) return undefined; // over budget → stop + return delay; + } + + /** The full schedule delayFor would emit (until budget exhausted). */ + function fullSchedule(): number[] { + const result: number[] = []; + let attempt = 0; + while (true) { + const delay = delayFor(attempt); + if (delay === undefined) break; + result.push(delay); + attempt++; + } + return result; + } + + /** + * Fake, controllable `sleep`: records every call's delay, resolves + * instantly (no real waiting), and can abort the controller on a chosen + * 1-based call index to simulate "abort during sleep". + */ + function createFakeSleep(controller: AbortController): { + sleep: (ms: number, signal: AbortSignal) => Promise<void>; + calls: number[]; + abortOnCall: (n: number) => void; + } { + const calls: number[] = []; + let abortAt: number | undefined; + const sleep = async (ms: number, _signal: AbortSignal): Promise<void> => { + calls.push(ms); + if (abortAt !== undefined && calls.length === abortAt) { + controller.abort(); + throw new Error("aborted"); + } + // Otherwise resolve instantly (no real waiting). + }; + return { + sleep, + calls, + abortOnCall: (n: number) => { + abortAt = n; + }, + }; + } + + /** A provider that yields a retryable error `errorCount` times, then success. */ + function createRetryingProvider(opts: { + errorCount: number; + error?: { message: string; code?: string; retryable?: boolean }; + success?: ProviderEvent[]; + }): { provider: ProviderContract; streamCalls: { value: number } } { + const streamCalls = { value: 0 }; + const error: ProviderEvent = { + type: "error", + message: opts.error?.message ?? "overloaded", + ...(opts.error?.code !== undefined ? { code: opts.error.code } : {}), + ...(opts.error?.retryable !== undefined ? { retryable: opts.error.retryable } : {}), + }; + const success = opts.success ?? [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ]; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = streamCalls.value++; + return (async function* () { + if (idx < opts.errorCount) { + yield error; + return; + } + for (const event of success) yield event; + })(); + }, + }; + return { provider, streamCalls }; + } + + describe("retry with backoff", () => { + it("retries a retryable emitted error on schedule then succeeds", async () => { + const { provider } = createRetryingProvider({ + errorCount: 3, + error: { message: "HTTP 429: overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("stop"); + // 3 retries: 5s, 10s, 30s. + expect(fake.calls).toEqual([5_000, 10_000, 30_000]); + // 3 provider-retry events (one per sleep), then the successful text. + const retryEvents = events.filter((e) => e.type === "provider-retry"); + expect(retryEvents).toHaveLength(3); + if (retryEvents[0]?.type === "provider-retry") { + expect(retryEvents[0].attempt).toBe(0); + expect(retryEvents[0].delayMs).toBe(5_000); + expect(retryEvents[0].message).toBe("HTTP 429: overloaded"); + expect(retryEvents[0].code).toBe("429"); + expect(retryEvents[0].conversationId).toBe("conv-1"); + expect(retryEvents[0].turnId).toBe("turn-1"); + } + if (retryEvents[1]?.type === "provider-retry") { + expect(retryEvents[1].attempt).toBe(1); + expect(retryEvents[1].delayMs).toBe(10_000); + } + if (retryEvents[2]?.type === "provider-retry") { + expect(retryEvents[2].attempt).toBe(2); + expect(retryEvents[2].delayMs).toBe(30_000); + } + // The error was suppressed (no error event emitted — retry succeeded). + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + // The successful content still streams. + const deltas = events.filter((e) => e.type === "text-delta"); + expect(deltas).toHaveLength(1); + }); + + it("sleep is called with the full schedule [5s,10s,30s,60s,5m,10m,15m,30m,30m…]", async () => { + // Provider errors forever → retries until budget exhausted → gives up. + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + // Budget exhausted → give up → error. + expect(result.finishReason).toBe("error"); + + // The sleep schedule matches the pure delayFor output exactly. + expect(fake.calls).toEqual(fullSchedule()); + + // Head of the schedule (the 8 stepped delays). + expect(fake.calls.slice(0, 8)).toEqual([ + 5_000, 10_000, 30_000, 60_000, 300_000, 600_000, 900_000, 1_800_000, + ]); + // Tail repeats 30m. + expect(fake.calls[8]).toBe(1_800_000); + expect(fake.calls.at(-1)).toBe(1_800_000); + + // 8h cumulative budget cap: head (3705s) + 13×30m = ~7h31m, then stop. + // 21 retries (attempts 0..20), then delayFor(21) → undefined → give up. + expect(fake.calls).toHaveLength(21); + const totalSlept = fake.calls.reduce((a, b) => a + b, 0); + expect(totalSlept).toBeLessThanOrEqual(RETRY_BUDGET_MS); + expect(totalSlept).toBe(3_705_000 + 13 * 1_800_000); // 27_105_000 + + // One provider-retry per sleep, plus a final error (give-up). + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(21); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + const errEvt = events.find((e) => e.type === "error"); + if (errEvt?.type === "error") { + expect(errEvt.message).toBe("overloaded"); + expect(errEvt.code).toBe("429"); + } + }); + + it("does NOT retry after content was emitted (safety invariant)", async () => { + // Provider yields text (content) THEN a retryable error. Because content + // was emitted, retrying is unsafe (would duplicate partial output). + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + yield { + type: "error", + message: "overloaded", + code: "429", + retryable: true, + } as ProviderEvent; + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + // No retries: stream called exactly once. + expect(callCount).toBe(1); + expect(fake.calls).toHaveLength(0); + // The error is emitted (give-up) and partial content preserved. + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); + }); + + it("does NOT retry a non-retryable emitted error (retryable: false)", async () => { + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "bad request", code: "400", retryable: false }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + }); + + it("does NOT retry a non-retryable emitted error (retryable absent)", async () => { + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "bad request", code: "400" }, // no retryable field + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + }); + + it("give-up emits the final error when budget is exhausted", async () => { + // Custom delayFor that allows exactly 1 retry then stops. + const shortDelayFor = (attempt: number): number | undefined => + attempt === 0 ? 100 : undefined; + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor: shortDelayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("error"); + expect(fake.calls).toEqual([100]); // one retry, then give up + // One provider-retry (attempt 0), then the final error. + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(1); + const errs = events.filter((e) => e.type === "error"); + expect(errs).toHaveLength(1); + if (errs[0]?.type === "error") { + expect(errs[0].message).toBe("overloaded"); + expect(errs[0].code).toBe("429"); + } + }); + + it("abort during sleep seals the turn aborted", async () => { + const { provider } = createRetryingProvider({ + errorCount: Number.POSITIVE_INFINITY, + error: { message: "overloaded", code: "429", retryable: true }, + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + fake.abortOnCall(2); // abort on the 2nd sleep + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(result.finishReason).toBe("aborted"); + // Two sleeps attempted; the 2nd aborted. + expect(fake.calls).toHaveLength(2); + // No terminal error emitted (it was an abort, not a give-up). + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + // One provider-retry before the aborted sleep (attempt 0). + const retries = events.filter((e) => e.type === "provider-retry"); + expect(retries).toHaveLength(2); + // The done event carries reason "aborted". + const done = events.find((e) => e.type === "done"); + if (done?.type === "done") { + expect(done.reason).toBe("aborted"); + } + }); + + it("omitting retry keeps the pre-retry behavior (backward-compatible)", async () => { + // A retryable error with no retry configured → ends the step as today. + const { provider, streamCalls } = createRetryingProvider({ + errorCount: 1, + error: { message: "overloaded", code: "429", retryable: true }, + }); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + // no retry field + }); + + expect(streamCalls.value).toBe(1); // no retry + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(0); + }); + + it("retries a THROWN error (retryable-by-default when pre-content)", async () => { + // A thrown error (no retryable flag) before content is retried. + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + if (callCount <= 2) { + throw new Error("network blip"); + } + yield { type: "text-delta", delta: "hi" } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(callCount).toBe(3); // 2 throws retried, 3rd succeeds + expect(fake.calls).toEqual([5_000, 10_000]); + expect(result.finishReason).toBe("stop"); + expect(events.filter((e) => e.type === "provider-retry")).toHaveLength(2); + // Thrown errors have no code. + if (events[0]?.type === "provider-retry") { + expect(events[0].code).toBeUndefined(); + expect(events[0].message).toBe("network blip"); + } + expect(events.filter((e) => e.type === "error")).toHaveLength(0); + }); + + it("does NOT retry a thrown error after content was emitted", async () => { + let callCount = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + callCount++; + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("network blip"); + })(); + }, + }; + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + expect(callCount).toBe(1); + expect(fake.calls).toHaveLength(0); + expect(result.finishReason).toBe("error"); + expect(events.filter((e) => e.type === "error")).toHaveLength(1); + expect(events.filter((e) => e.type === "text-delta")).toHaveLength(1); + }); + + it("provider-retry events interleave correctly: error → retry-event → sleep → retry", async () => { + // Verify ordering: each provider-retry event comes BEFORE its sleep, + // and the successful content comes only after the last retry. + const { provider } = createRetryingProvider({ + errorCount: 2, + error: { message: "overloaded", code: "429", retryable: true }, + success: [ + { type: "text-delta", delta: "ok" }, + { type: "finish", reason: "stop" }, + ], + }); + const controller = new AbortController(); + const fake = createFakeSleep(controller); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: controller.signal, + retry: { delayFor, sleep: fake.sleep }, + }); + + const types = events.map((e) => e.type); + // turn-start, provider-retry(0), provider-retry(1), text-delta, step-complete, done + expect(types[0]).toBe("turn-start"); + const firstRetryIdx = types.indexOf("provider-retry"); + const textIdx = types.indexOf("text-delta"); + expect(firstRetryIdx).toBeGreaterThan(0); + expect(textIdx).toBeGreaterThan(firstRetryIdx); + // Both retries precede the text. + const retryCount = types.filter((t) => t === "provider-retry").length; + expect(retryCount).toBe(2); + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 273482f..3460033 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -1,30 +1,30 @@ import type { ChatMessage, Chunk, StepId } from "../contracts/conversation.js"; import type { Logger, Span } from "../contracts/logging.js"; import type { - ProviderContract, - ProviderEvent, - ProviderStreamOptions, - Usage, + ProviderContract, + ProviderEvent, + ProviderStreamOptions, + Usage, } from "../contracts/provider.js"; import type { - EventEmitter, - RetryStrategy, - RunTurnInput, - RunTurnResult, + EventEmitter, + RetryStrategy, + RunTurnInput, + RunTurnResult, } from "../contracts/runtime.js"; import type { ToolCall, ToolContract } from "../contracts/tool.js"; import { createStepDispatcher, type StepDispatcher } from "./dispatch.js"; import { - doneEvent, - errorEvent, - providerRetryEvent, - reasoningDeltaEvent, - stepCompleteEvent, - textDeltaEvent, - toolCallEvent, - toolResultEvent, - turnStartEvent, - usageEvent, + doneEvent, + errorEvent, + providerRetryEvent, + reasoningDeltaEvent, + stepCompleteEvent, + textDeltaEvent, + toolCallEvent, + toolResultEvent, + turnStartEvent, + usageEvent, } from "./events.js"; /** Max steps per turn. 0 = unlimited (the loop runs until the model stops @@ -32,69 +32,69 @@ import { export const MAX_STEPS = 0; function zeroUsage(): Usage { - return { inputTokens: 0, outputTokens: 0 }; + return { inputTokens: 0, outputTokens: 0 }; } function addUsage(a: Usage, b: Usage): Usage { - const inputTokens = a.inputTokens + b.inputTokens; - const outputTokens = a.outputTokens + b.outputTokens; - - if (a.cacheReadTokens !== undefined || b.cacheReadTokens !== undefined) { - const cacheReadTokens = (a.cacheReadTokens ?? 0) + (b.cacheReadTokens ?? 0); - if (a.cacheWriteTokens !== undefined || b.cacheWriteTokens !== undefined) { - return { - inputTokens, - outputTokens, - cacheReadTokens, - cacheWriteTokens: (a.cacheWriteTokens ?? 0) + (b.cacheWriteTokens ?? 0), - }; - } - return { inputTokens, outputTokens, cacheReadTokens }; - } - - if (a.cacheWriteTokens !== undefined || b.cacheWriteTokens !== undefined) { - return { - inputTokens, - outputTokens, - cacheWriteTokens: (a.cacheWriteTokens ?? 0) + (b.cacheWriteTokens ?? 0), - }; - } - - return { inputTokens, outputTokens }; + const inputTokens = a.inputTokens + b.inputTokens; + const outputTokens = a.outputTokens + b.outputTokens; + + if (a.cacheReadTokens !== undefined || b.cacheReadTokens !== undefined) { + const cacheReadTokens = (a.cacheReadTokens ?? 0) + (b.cacheReadTokens ?? 0); + if (a.cacheWriteTokens !== undefined || b.cacheWriteTokens !== undefined) { + return { + inputTokens, + outputTokens, + cacheReadTokens, + cacheWriteTokens: (a.cacheWriteTokens ?? 0) + (b.cacheWriteTokens ?? 0), + }; + } + return { inputTokens, outputTokens, cacheReadTokens }; + } + + if (a.cacheWriteTokens !== undefined || b.cacheWriteTokens !== undefined) { + return { + inputTokens, + outputTokens, + cacheWriteTokens: (a.cacheWriteTokens ?? 0) + (b.cacheWriteTokens ?? 0), + }; + } + + return { inputTokens, outputTokens }; } function usageAttrs(usage: Usage): Record<string, string | number | boolean | null> { - const attrs: Record<string, string | number | boolean | null> = { - "usage.inputTokens": usage.inputTokens, - "usage.outputTokens": usage.outputTokens, - }; - if (usage.cacheReadTokens !== undefined) { - attrs["usage.cacheReadTokens"] = usage.cacheReadTokens; - } - if (usage.cacheWriteTokens !== undefined) { - attrs["usage.cacheWriteTokens"] = usage.cacheWriteTokens; - } - return attrs; + const attrs: Record<string, string | number | boolean | null> = { + "usage.inputTokens": usage.inputTokens, + "usage.outputTokens": usage.outputTokens, + }; + if (usage.cacheReadTokens !== undefined) { + attrs["usage.cacheReadTokens"] = usage.cacheReadTokens; + } + if (usage.cacheWriteTokens !== undefined) { + attrs["usage.cacheWriteTokens"] = usage.cacheWriteTokens; + } + return attrs; } function appendTextDelta(chunks: Chunk[], delta: string): void { - const lastIdx = chunks.length - 1; - const last = chunks[lastIdx]; - if (last !== undefined && last.type === "text") { - chunks[lastIdx] = { type: "text", text: last.text + delta }; - } else { - chunks.push({ type: "text", text: delta }); - } + const lastIdx = chunks.length - 1; + const last = chunks[lastIdx]; + if (last !== undefined && last.type === "text") { + chunks[lastIdx] = { type: "text", text: last.text + delta }; + } else { + chunks.push({ type: "text", text: delta }); + } } function appendThinkingDelta(chunks: Chunk[], delta: string): void { - const lastIdx = chunks.length - 1; - const last = chunks[lastIdx]; - if (last !== undefined && last.type === "thinking") { - chunks[lastIdx] = { type: "thinking", text: last.text + delta }; - } else { - chunks.push({ type: "thinking", text: delta }); - } + const lastIdx = chunks.length - 1; + const last = chunks[lastIdx]; + if (last !== undefined && last.type === "thinking") { + chunks[lastIdx] = { type: "thinking", text: last.text + delta }; + } else { + chunks.push({ type: "thinking", text: delta }); + } } /** @@ -106,698 +106,698 @@ function appendThinkingDelta(chunks: Chunk[], delta: string): void { * orphaned `tool` messages in the next turn's history. */ function stripToolCallChunks(msg: ChatMessage): ChatMessage | undefined { - const stripped = msg.chunks.filter((c) => c.type !== "tool-call"); - return stripped.length > 0 ? { role: msg.role, chunks: stripped } : undefined; + const stripped = msg.chunks.filter((c) => c.type !== "tool-call"); + return stripped.length > 0 ? { role: msg.role, chunks: stripped } : undefined; } interface StepContext { - readonly provider: ProviderContract; - readonly messages: ChatMessage[]; - readonly tools: readonly ToolContract[]; - readonly toolMap: Map<string, ToolContract>; - readonly dispatch: RunTurnInput["dispatch"]; - readonly emit: EventEmitter; - readonly signal: AbortSignal; - readonly conversationId: string; - readonly turnId: string; - readonly stepId: StepId; - readonly logger: Logger; - readonly turnSpan: Span | undefined; - readonly toolSpans: Map<string, Span>; - readonly cwd: string | undefined; - readonly computerId: string | undefined; - readonly now: (() => number) | undefined; - /** Per-turn provider options (model, systemPrompt, …) threaded to stream(). */ - readonly providerOpts: ProviderStreamOptions | undefined; - /** Optional injected retry strategy (omit = no retry, backward-compatible). */ - readonly retry: RetryStrategy | undefined; + readonly provider: ProviderContract; + readonly messages: ChatMessage[]; + readonly tools: readonly ToolContract[]; + readonly toolMap: Map<string, ToolContract>; + readonly dispatch: RunTurnInput["dispatch"]; + readonly emit: EventEmitter; + readonly signal: AbortSignal; + readonly conversationId: string; + readonly turnId: string; + readonly stepId: StepId; + readonly logger: Logger; + readonly turnSpan: Span | undefined; + readonly toolSpans: Map<string, Span>; + readonly cwd: string | undefined; + readonly computerId: string | undefined; + readonly now: (() => number) | undefined; + /** Per-turn provider options (model, systemPrompt, …) threaded to stream(). */ + readonly providerOpts: ProviderStreamOptions | undefined; + /** Optional injected retry strategy (omit = no retry, backward-compatible). */ + readonly retry: RetryStrategy | undefined; } interface TimingState { - ttftSpan: Span | undefined; - decodeSpan: Span | undefined; - firstTokenSeen: boolean; - streamStartMs: number | undefined; - firstTokenMs: number | undefined; + ttftSpan: Span | undefined; + decodeSpan: Span | undefined; + firstTokenSeen: boolean; + streamStartMs: number | undefined; + firstTokenMs: number | undefined; } interface StepResult { - readonly assistantMessage: ChatMessage | undefined; - readonly toolCalls: ToolCall[]; - readonly toolMessages: ChatMessage[]; - readonly usage: Usage; - readonly finishReason: string; + readonly assistantMessage: ChatMessage | undefined; + readonly toolCalls: ToolCall[]; + readonly toolMessages: ChatMessage[]; + readonly usage: Usage; + readonly finishReason: string; } function processEvent( - event: ProviderEvent, - chunks: Chunk[], - toolCalls: ToolCall[], - dispatcher: StepDispatcher, - ctx: StepContext, - stepSpan: Span | undefined, - timing: TimingState, - toolDispatchTimes: Map<string, number>, + event: ProviderEvent, + chunks: Chunk[], + toolCalls: ToolCall[], + dispatcher: StepDispatcher, + ctx: StepContext, + stepSpan: Span | undefined, + timing: TimingState, + toolDispatchTimes: Map<string, number>, ): void { - switch (event.type) { - case "text-delta": - if (!timing.firstTokenSeen) { - timing.firstTokenSeen = true; - if (ctx.now !== undefined) { - timing.firstTokenMs = ctx.now(); - } - try { - timing.ttftSpan?.end({ attrs: { firstToken: true } }); - } catch { - // Swallow — D7. - } - timing.ttftSpan = undefined; - try { - timing.decodeSpan = stepSpan?.child("decode"); - } catch { - // Swallow — D7. - } - } - appendTextDelta(chunks, event.delta); - ctx.emit(textDeltaEvent(ctx.conversationId, ctx.turnId, event.delta)); - break; - case "reasoning-delta": - if (!timing.firstTokenSeen) { - timing.firstTokenSeen = true; - if (ctx.now !== undefined) { - timing.firstTokenMs = ctx.now(); - } - try { - timing.ttftSpan?.end({ attrs: { firstToken: true } }); - } catch { - // Swallow — D7. - } - timing.ttftSpan = undefined; - try { - timing.decodeSpan = stepSpan?.child("decode"); - } catch { - // Swallow — D7. - } - } - appendThinkingDelta(chunks, event.delta); - ctx.emit(reasoningDeltaEvent(ctx.conversationId, ctx.turnId, event.delta)); - break; - case "tool-call": { - const call: ToolCall = { - id: event.toolCallId, - name: event.toolName, - input: event.input, - }; - toolCalls.push(call); - chunks.push({ - type: "tool-call", - toolCallId: event.toolCallId, - toolName: event.toolName, - input: event.input, - stepId: ctx.stepId, - }); - ctx.emit( - toolCallEvent( - ctx.conversationId, - ctx.turnId, - ctx.stepId, - event.toolCallId, - event.toolName, - event.input, - ), - ); - - // Capture dispatch time for tool-call durationMs - if (ctx.now !== undefined) { - toolDispatchTimes.set(event.toolCallId, ctx.now()); - } - - // Open a tool-call span as a child of the step span (attrs: name, toolCallId) - try { - const tcSpan = - stepSpan !== undefined - ? stepSpan.child("tool-call", { - name: event.toolName, - toolCallId: event.toolCallId, - }) - : ctx.logger.span("tool-call", { - name: event.toolName, - toolCallId: event.toolCallId, - }); - ctx.toolSpans.set(event.toolCallId, tcSpan); - } catch { - // Swallow — D7: logging never breaks the turn. - } - - if (ctx.dispatch.eager) { - dispatcher.submit(call); - } - break; - } - case "usage": - ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage, ctx.stepId)); - break; - case "finish": - break; - case "error": - // Handled by the retry loop in executeStep (not here): an error event - // is intercepted before processEvent so the step can decide whether to - // retry (suppressing the error) or give up (emit it). processEvent - // never receives an "error" event. - break; - } + switch (event.type) { + case "text-delta": + if (!timing.firstTokenSeen) { + timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } + try { + timing.ttftSpan?.end({ attrs: { firstToken: true } }); + } catch { + // Swallow — D7. + } + timing.ttftSpan = undefined; + try { + timing.decodeSpan = stepSpan?.child("decode"); + } catch { + // Swallow — D7. + } + } + appendTextDelta(chunks, event.delta); + ctx.emit(textDeltaEvent(ctx.conversationId, ctx.turnId, event.delta)); + break; + case "reasoning-delta": + if (!timing.firstTokenSeen) { + timing.firstTokenSeen = true; + if (ctx.now !== undefined) { + timing.firstTokenMs = ctx.now(); + } + try { + timing.ttftSpan?.end({ attrs: { firstToken: true } }); + } catch { + // Swallow — D7. + } + timing.ttftSpan = undefined; + try { + timing.decodeSpan = stepSpan?.child("decode"); + } catch { + // Swallow — D7. + } + } + appendThinkingDelta(chunks, event.delta); + ctx.emit(reasoningDeltaEvent(ctx.conversationId, ctx.turnId, event.delta)); + break; + case "tool-call": { + const call: ToolCall = { + id: event.toolCallId, + name: event.toolName, + input: event.input, + }; + toolCalls.push(call); + chunks.push({ + type: "tool-call", + toolCallId: event.toolCallId, + toolName: event.toolName, + input: event.input, + stepId: ctx.stepId, + }); + ctx.emit( + toolCallEvent( + ctx.conversationId, + ctx.turnId, + ctx.stepId, + event.toolCallId, + event.toolName, + event.input, + ), + ); + + // Capture dispatch time for tool-call durationMs + if (ctx.now !== undefined) { + toolDispatchTimes.set(event.toolCallId, ctx.now()); + } + + // Open a tool-call span as a child of the step span (attrs: name, toolCallId) + try { + const tcSpan = + stepSpan !== undefined + ? stepSpan.child("tool-call", { + name: event.toolName, + toolCallId: event.toolCallId, + }) + : ctx.logger.span("tool-call", { + name: event.toolName, + toolCallId: event.toolCallId, + }); + ctx.toolSpans.set(event.toolCallId, tcSpan); + } catch { + // Swallow — D7: logging never breaks the turn. + } + + if (ctx.dispatch.eager) { + dispatcher.submit(call); + } + break; + } + case "usage": + ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage, ctx.stepId)); + break; + case "finish": + break; + case "error": + // Handled by the retry loop in executeStep (not here): an error event + // is intercepted before processEvent so the step can decide whether to + // retry (suppressing the error) or give up (emit it). processEvent + // never receives an "error" event. + break; + } } async function executeStep(ctx: StepContext): Promise<StepResult> { - const chunks: Chunk[] = []; - const toolCalls: ToolCall[] = []; - const toolDispatchTimes = new Map<string, number>(); - let stepUsage = zeroUsage(); - let finishReason = "stop"; - - // Open a step span as a child of the turn span; capture the verbatim - // pre-mutation prompt via a "prompt" child span whose body holds the - // serialized messages+tools. - let stepSpan: Span | undefined; - try { - stepSpan = ctx.turnSpan !== undefined ? ctx.turnSpan.child("step") : ctx.logger.span("step"); - const promptBody = JSON.stringify({ messages: ctx.messages, tools: ctx.tools }); - const promptSpan = stepSpan.child( - "prompt", - { - messageCount: ctx.messages.length, - toolCount: ctx.tools.length, - }, - promptBody, - ); - promptSpan.end(); - } catch { - // Swallow — D7. - } - - const dispatcher = createStepDispatcher( - ctx.toolMap, - ctx.dispatch, - ctx.signal, - ctx.emit, - ctx.conversationId, - ctx.turnId, - ctx.toolSpans, - ctx.cwd, - ctx.computerId, - ); - - const timing: TimingState = { - ttftSpan: undefined, - decodeSpan: undefined, - firstTokenSeen: false, - streamStartMs: ctx.now !== undefined ? ctx.now() : undefined, - firstTokenMs: undefined, - }; - - // Open TTFT span when spans are enabled - try { - if (stepSpan !== undefined) { - timing.ttftSpan = stepSpan.child("ttft"); - } - } catch { - // Swallow — D7. - } - - // Retry loop: wrap provider.stream() consumption. Retries are ONLY - // attempted when no content was emitted yet this step (the safety - // invariant — never duplicate partial output). On a retryable error — - // either an EMITTED `error` ProviderEvent with `retryable === true`, OR a - // THROWN error (retryable-by-default when pre-content) — with !hadContent: - // ask retry.delayFor(attempt); if it returns a delay → emit a transient - // provider-retry AgentEvent, sleep via the injected retry.sleep (abortable), - // attempt++, re-call provider.stream(); if it returns undefined (budget - // exhausted) → give up. Non-retryable emitted errors (retryable === false or - // absent), errors after content, and the no-retry-configured case all fall - // through to "give up" — identical to the pre-retry behavior. - let hadContent = false; - let attempt = 0; - while (true) { - let errored = false; - let wasThrown = false; - let errorMessage: string | undefined; - let errorCode: string | undefined; - let errorRetryable: boolean | undefined; - let thrownErr: unknown; - - try { - const opts: ProviderStreamOptions = { - ...ctx.providerOpts, - ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), - }; - const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); - for await (const event of stream) { - if (ctx.signal.aborted) break; - if (event.type === "error") { - // Intercept: hold for the retry decision — don't push a chunk - // or emit yet (a successful retry would leave a stale error). - errored = true; - errorMessage = event.message; - errorCode = event.code; - errorRetryable = event.retryable; - break; - } - if ( - event.type === "text-delta" || - event.type === "reasoning-delta" || - event.type === "tool-call" || - event.type === "usage" - ) { - hadContent = true; - } - processEvent( - event, - chunks, - toolCalls, - dispatcher, - ctx, - stepSpan, - timing, - toolDispatchTimes, - ); - if (event.type === "usage") { - stepUsage = addUsage(stepUsage, event.usage); - } - if (event.type === "finish") { - finishReason = event.reason; - } - } - } catch (err) { - errored = true; - wasThrown = true; - errorMessage = err instanceof Error ? err.message : String(err); - errorCode = undefined; - errorRetryable = undefined; - thrownErr = err; - } - - // Abort (during stream) → stop; the runTurn loop seals aborted. - if (ctx.signal.aborted) { - break; - } - - // No error → step succeeded. - if (!errored) { - break; - } - - // Retryable? A thrown error is retryable-by-default when pre-content; - // an emitted error is retryable ONLY when `retryable === true` (absent - // or false → not retried, per the contract). - const isRetryable = wasThrown ? true : errorRetryable === true; - if (ctx.retry !== undefined && !hadContent && isRetryable) { - const delay = ctx.retry.delayFor(attempt); - if (delay !== undefined) { - // Emit the transient provider-retry event BEFORE the sleep so the - // UI shows "⚠ retrying in Ns…" immediately. Not persisted as a - // chat message — it never pollutes the prompt. - ctx.emit( - providerRetryEvent( - ctx.conversationId, - ctx.turnId, - attempt, - delay, - errorMessage ?? "", - errorCode, - ), - ); - // Abortable sleep. If the signal fires during sleep, the shell's - // sleep rejects — we catch it and break so the turn seals aborted. - try { - await ctx.retry.sleep(delay, ctx.signal); - } catch { - // Abort during sleep (or unexpected sleep failure). - } - if (ctx.signal.aborted) { - break; - } - attempt++; - continue; - } - // delayFor returned undefined → budget exhausted → give up. - } - - // Give up: emit the suppressed error and end the step. This is the - // single emission point for a terminal provider error (non-retryable, - // post-content, budget-exhausted, or no-retry-configured). - const message = errorMessage ?? ""; - if (errorCode !== undefined) { - chunks.push({ type: "error", message, code: errorCode }); - } else { - chunks.push({ type: "error", message }); - } - ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message, errorCode)); - finishReason = "error"; - try { - stepSpan?.end({ err: thrownErr ?? new Error(message) }); - } catch { - // Swallow — D7. - } - stepSpan = undefined; - break; - } - - // Close timing spans: if no first token was seen, end ttft with firstToken: false - // If decode span is open, close it - try { - if (timing.ttftSpan !== undefined) { - timing.ttftSpan.end({ attrs: { firstToken: false } }); - timing.ttftSpan = undefined; - } - if (timing.decodeSpan !== undefined) { - timing.decodeSpan.end(); - timing.decodeSpan = undefined; - } - } catch { - // Swallow — D7. - } - - // Emit step-complete event with timing - const streamEndMs = ctx.now !== undefined ? ctx.now() : undefined; - if (timing.streamStartMs !== undefined && streamEndMs !== undefined) { - const genTotalMs = streamEndMs - timing.streamStartMs; - const stepTiming: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = { - genTotalMs, - }; - if (timing.firstTokenMs !== undefined) { - stepTiming.ttftMs = timing.firstTokenMs - timing.streamStartMs; - stepTiming.decodeMs = streamEndMs - timing.firstTokenMs; - } - ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId, stepTiming)); - } else { - ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId)); - } - - if (!ctx.dispatch.eager) { - for (const call of toolCalls) { - dispatcher.submit(call); - } - } - - const results = await dispatcher.drain(); - - // Close remaining tool-call spans - for (const call of toolCalls) { - const tcSpan = ctx.toolSpans.get(call.id); - if (tcSpan !== undefined) { - const result = results.get(call.id); - try { - tcSpan.end({ - attrs: { - isError: result?.isError ?? false, - contentLength: result?.content.length ?? 0, - }, - }); - } catch { - // Swallow — D7. - } - ctx.toolSpans.delete(call.id); - } - } - - const toolMessages: ChatMessage[] = []; - for (const call of toolCalls) { - const result = results.get(call.id); - if (result !== undefined) { - const isError = result.isError ?? false; - const dispatchTime = toolDispatchTimes.get(call.id); - const toolDurationMs = - ctx.now !== undefined && dispatchTime !== undefined ? ctx.now() - dispatchTime : undefined; - ctx.emit( - toolResultEvent( - ctx.conversationId, - ctx.turnId, - ctx.stepId, - call.id, - call.name, - result.content, - isError, - toolDurationMs, - ), - ); - toolMessages.push({ - role: "tool", - chunks: [ - { - type: "tool-result", - toolCallId: call.id, - toolName: call.name, - content: result.content, - isError, - stepId: ctx.stepId, - }, - ], - }); - } - } - - // Close step span (if not already closed by error) - if (stepSpan !== undefined) { - try { - stepSpan.end({ - attrs: { - finishReason, - ...usageAttrs(stepUsage), - }, - }); - } catch { - // Swallow — D7. - } - } - - const assistantMessage: ChatMessage | undefined = - chunks.length > 0 ? { role: "assistant", chunks } : undefined; - - return { assistantMessage, toolCalls, toolMessages, usage: stepUsage, finishReason }; + const chunks: Chunk[] = []; + const toolCalls: ToolCall[] = []; + const toolDispatchTimes = new Map<string, number>(); + let stepUsage = zeroUsage(); + let finishReason = "stop"; + + // Open a step span as a child of the turn span; capture the verbatim + // pre-mutation prompt via a "prompt" child span whose body holds the + // serialized messages+tools. + let stepSpan: Span | undefined; + try { + stepSpan = ctx.turnSpan !== undefined ? ctx.turnSpan.child("step") : ctx.logger.span("step"); + const promptBody = JSON.stringify({ messages: ctx.messages, tools: ctx.tools }); + const promptSpan = stepSpan.child( + "prompt", + { + messageCount: ctx.messages.length, + toolCount: ctx.tools.length, + }, + promptBody, + ); + promptSpan.end(); + } catch { + // Swallow — D7. + } + + const dispatcher = createStepDispatcher( + ctx.toolMap, + ctx.dispatch, + ctx.signal, + ctx.emit, + ctx.conversationId, + ctx.turnId, + ctx.toolSpans, + ctx.cwd, + ctx.computerId, + ); + + const timing: TimingState = { + ttftSpan: undefined, + decodeSpan: undefined, + firstTokenSeen: false, + streamStartMs: ctx.now !== undefined ? ctx.now() : undefined, + firstTokenMs: undefined, + }; + + // Open TTFT span when spans are enabled + try { + if (stepSpan !== undefined) { + timing.ttftSpan = stepSpan.child("ttft"); + } + } catch { + // Swallow — D7. + } + + // Retry loop: wrap provider.stream() consumption. Retries are ONLY + // attempted when no content was emitted yet this step (the safety + // invariant — never duplicate partial output). On a retryable error — + // either an EMITTED `error` ProviderEvent with `retryable === true`, OR a + // THROWN error (retryable-by-default when pre-content) — with !hadContent: + // ask retry.delayFor(attempt); if it returns a delay → emit a transient + // provider-retry AgentEvent, sleep via the injected retry.sleep (abortable), + // attempt++, re-call provider.stream(); if it returns undefined (budget + // exhausted) → give up. Non-retryable emitted errors (retryable === false or + // absent), errors after content, and the no-retry-configured case all fall + // through to "give up" — identical to the pre-retry behavior. + let hadContent = false; + let attempt = 0; + while (true) { + let errored = false; + let wasThrown = false; + let errorMessage: string | undefined; + let errorCode: string | undefined; + let errorRetryable: boolean | undefined; + let thrownErr: unknown; + + try { + const opts: ProviderStreamOptions = { + ...ctx.providerOpts, + ...(ctx.turnSpan !== undefined && stepSpan !== undefined ? { logger: stepSpan.log } : {}), + }; + const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts); + for await (const event of stream) { + if (ctx.signal.aborted) break; + if (event.type === "error") { + // Intercept: hold for the retry decision — don't push a chunk + // or emit yet (a successful retry would leave a stale error). + errored = true; + errorMessage = event.message; + errorCode = event.code; + errorRetryable = event.retryable; + break; + } + if ( + event.type === "text-delta" || + event.type === "reasoning-delta" || + event.type === "tool-call" || + event.type === "usage" + ) { + hadContent = true; + } + processEvent( + event, + chunks, + toolCalls, + dispatcher, + ctx, + stepSpan, + timing, + toolDispatchTimes, + ); + if (event.type === "usage") { + stepUsage = addUsage(stepUsage, event.usage); + } + if (event.type === "finish") { + finishReason = event.reason; + } + } + } catch (err) { + errored = true; + wasThrown = true; + errorMessage = err instanceof Error ? err.message : String(err); + errorCode = undefined; + errorRetryable = undefined; + thrownErr = err; + } + + // Abort (during stream) → stop; the runTurn loop seals aborted. + if (ctx.signal.aborted) { + break; + } + + // No error → step succeeded. + if (!errored) { + break; + } + + // Retryable? A thrown error is retryable-by-default when pre-content; + // an emitted error is retryable ONLY when `retryable === true` (absent + // or false → not retried, per the contract). + const isRetryable = wasThrown ? true : errorRetryable === true; + if (ctx.retry !== undefined && !hadContent && isRetryable) { + const delay = ctx.retry.delayFor(attempt); + if (delay !== undefined) { + // Emit the transient provider-retry event BEFORE the sleep so the + // UI shows "⚠ retrying in Ns…" immediately. Not persisted as a + // chat message — it never pollutes the prompt. + ctx.emit( + providerRetryEvent( + ctx.conversationId, + ctx.turnId, + attempt, + delay, + errorMessage ?? "", + errorCode, + ), + ); + // Abortable sleep. If the signal fires during sleep, the shell's + // sleep rejects — we catch it and break so the turn seals aborted. + try { + await ctx.retry.sleep(delay, ctx.signal); + } catch { + // Abort during sleep (or unexpected sleep failure). + } + if (ctx.signal.aborted) { + break; + } + attempt++; + continue; + } + // delayFor returned undefined → budget exhausted → give up. + } + + // Give up: emit the suppressed error and end the step. This is the + // single emission point for a terminal provider error (non-retryable, + // post-content, budget-exhausted, or no-retry-configured). + const message = errorMessage ?? ""; + if (errorCode !== undefined) { + chunks.push({ type: "error", message, code: errorCode }); + } else { + chunks.push({ type: "error", message }); + } + ctx.emit(errorEvent(ctx.conversationId, ctx.turnId, message, errorCode)); + finishReason = "error"; + try { + stepSpan?.end({ err: thrownErr ?? new Error(message) }); + } catch { + // Swallow — D7. + } + stepSpan = undefined; + break; + } + + // Close timing spans: if no first token was seen, end ttft with firstToken: false + // If decode span is open, close it + try { + if (timing.ttftSpan !== undefined) { + timing.ttftSpan.end({ attrs: { firstToken: false } }); + timing.ttftSpan = undefined; + } + if (timing.decodeSpan !== undefined) { + timing.decodeSpan.end(); + timing.decodeSpan = undefined; + } + } catch { + // Swallow — D7. + } + + // Emit step-complete event with timing + const streamEndMs = ctx.now !== undefined ? ctx.now() : undefined; + if (timing.streamStartMs !== undefined && streamEndMs !== undefined) { + const genTotalMs = streamEndMs - timing.streamStartMs; + const stepTiming: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = { + genTotalMs, + }; + if (timing.firstTokenMs !== undefined) { + stepTiming.ttftMs = timing.firstTokenMs - timing.streamStartMs; + stepTiming.decodeMs = streamEndMs - timing.firstTokenMs; + } + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId, stepTiming)); + } else { + ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId)); + } + + if (!ctx.dispatch.eager) { + for (const call of toolCalls) { + dispatcher.submit(call); + } + } + + const results = await dispatcher.drain(); + + // Close remaining tool-call spans + for (const call of toolCalls) { + const tcSpan = ctx.toolSpans.get(call.id); + if (tcSpan !== undefined) { + const result = results.get(call.id); + try { + tcSpan.end({ + attrs: { + isError: result?.isError ?? false, + contentLength: result?.content.length ?? 0, + }, + }); + } catch { + // Swallow — D7. + } + ctx.toolSpans.delete(call.id); + } + } + + const toolMessages: ChatMessage[] = []; + for (const call of toolCalls) { + const result = results.get(call.id); + if (result !== undefined) { + const isError = result.isError ?? false; + const dispatchTime = toolDispatchTimes.get(call.id); + const toolDurationMs = + ctx.now !== undefined && dispatchTime !== undefined ? ctx.now() - dispatchTime : undefined; + ctx.emit( + toolResultEvent( + ctx.conversationId, + ctx.turnId, + ctx.stepId, + call.id, + call.name, + result.content, + isError, + toolDurationMs, + ), + ); + toolMessages.push({ + role: "tool", + chunks: [ + { + type: "tool-result", + toolCallId: call.id, + toolName: call.name, + content: result.content, + isError, + stepId: ctx.stepId, + }, + ], + }); + } + } + + // Close step span (if not already closed by error) + if (stepSpan !== undefined) { + try { + stepSpan.end({ + attrs: { + finishReason, + ...usageAttrs(stepUsage), + }, + }); + } catch { + // Swallow — D7. + } + } + + const assistantMessage: ChatMessage | undefined = + chunks.length > 0 ? { role: "assistant", chunks } : undefined; + + return { assistantMessage, toolCalls, toolMessages, usage: stepUsage, finishReason }; } export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { - const messages: ChatMessage[] = [...input.messages]; - const resultMessages: ChatMessage[] = []; - let totalUsage = zeroUsage(); - let lastStepUsage: Usage | undefined; - let finishReason = "stop"; - - const toolMap = new Map<string, ToolContract>(); - for (const tool of input.tools) { - toolMap.set(tool.name, tool); - } - - const conversationId = input.conversationId; - const turnId = input.turnId; - const signal = input.signal ?? new AbortController().signal; - const logger = input.logger; - const now = input.now; - - // Record turn start time for durationMs on done - const turnStartMs = now !== undefined ? now() : undefined; - - // Open a turn span (attrs: conversationId, turnId, model) - let turnSpan: Span | undefined; - if (logger !== undefined) { - try { - turnSpan = logger.span("turn", { - conversationId, - turnId, - model: input.providerOpts?.model ?? input.provider.id, - }); - } catch { - // Swallow — D7. - } - } - - // Track open tool-call spans across steps so we can close them on abort - const toolSpans = new Map<string, Span>(); - - input.emit(turnStartEvent(conversationId, turnId)); - - try { - for (let step = 0; MAX_STEPS === 0 || step < MAX_STEPS; step++) { - if (signal.aborted) { - finishReason = "aborted"; - break; - } - - const stepId = `${turnId}#${step}` as StepId; - - const stepResult = await executeStep({ - provider: input.provider, - messages, - tools: input.tools, - toolMap, - dispatch: input.dispatch, - emit: input.emit, - signal, - conversationId, - turnId, - stepId, - logger: turnSpan?.log ?? logger ?? createNoopLogger(), - turnSpan, - toolSpans, - cwd: input.cwd, - computerId: input.computerId, - now, - providerOpts: input.providerOpts, - retry: input.retry, - }); - - totalUsage = addUsage(totalUsage, stepResult.usage); - lastStepUsage = stepResult.usage; - - // When the signal is aborted mid-step, the tool results are - // placeholders ({ content: "Aborted", isError: true }). If these - // are persisted and included in the next turn's message history, - // the provider sees a `tool` role message without a preceding - // `assistant` message carrying `tool_calls` → 400 error. - // - // To prevent this, when the signal is aborted we: - // 1. Strip tool-call chunks from the assistant message (keep - // text/thinking/error chunks so the partial response is - // preserved). - // 2. Omit tool-result messages entirely (they are not persisted, - // not added to resultMessages, and not passed to onStepComplete). - // - // This keeps the conversation history clean: the assistant's - // partial text is preserved, but no incomplete tool calls are - // left dangling. The `done` event still carries - // `reason: "aborted"`, so the turn seals cleanly. - const stepAborted = signal.aborted; - const assistantMessage = - stepAborted && stepResult.assistantMessage !== undefined - ? stripToolCallChunks(stepResult.assistantMessage) - : stepResult.assistantMessage; - const toolMessages = stepAborted ? [] : stepResult.toolMessages; - - if (assistantMessage !== undefined) { - messages.push(assistantMessage); - resultMessages.push(assistantMessage); - } - - for (const msg of toolMessages) { - messages.push(msg); - resultMessages.push(msg); - } - - // Incremental persistence: notify the caller that this step's - // messages are finalized. The caller can persist them immediately - // (assigning seq numbers during generation). The messages are the - // SAME objects in resultMessages — the caller must NOT double-persist. - if (input.onStepComplete !== undefined) { - const stepMessages: ChatMessage[] = []; - if (assistantMessage !== undefined) { - stepMessages.push(assistantMessage); - } - for (const msg of toolMessages) { - stepMessages.push(msg); - } - if (stepMessages.length > 0) { - await input.onStepComplete(stepMessages); - } - } - - if (stepAborted) { - finishReason = "aborted"; - break; - } - - if (stepResult.toolCalls.length === 0) { - finishReason = stepResult.finishReason; - break; - } - - if (MAX_STEPS > 0 && step === MAX_STEPS - 1) { - finishReason = "max-steps"; - // No next step → no tool-result boundary. Leave any pending - // steering messages for the caller (it owns the queue). - } else { - // Tool-result boundary: this step produced tool calls and we are - // about to call provider.stream again. Drain steering messages - // and append them after the tool results, before the next call. - // The kernel owns no queue and names no feature — it just calls - // the callback and appends. Emits nothing (caller emits the - // `steering` AgentEvent in its own wrapper). - const steering = input.drainSteering?.() ?? []; - for (const msg of steering) { - messages.push(msg); - } - } - } - } finally { - // Close any orphaned tool-call spans (e.g. abort mid-tool) - for (const [id, tcSpan] of toolSpans) { - try { - tcSpan.end({ attrs: { orphaned: true } }); - } catch { - // Swallow — D7. - } - toolSpans.delete(id); - } - - // Close the turn span - if (turnSpan !== undefined) { - try { - turnSpan.end({ - attrs: { - finishReason, - ...usageAttrs(totalUsage), - }, - }); - } catch { - // Swallow — D7. - } - } - } - - const turnDurationMs = - turnStartMs !== undefined && now !== undefined ? now() - turnStartMs : undefined; - const hasUsage = - totalUsage.inputTokens > 0 || - totalUsage.outputTokens > 0 || - totalUsage.cacheReadTokens !== undefined || - totalUsage.cacheWriteTokens !== undefined; - const contextSize = - hasUsage && lastStepUsage !== undefined - ? lastStepUsage.inputTokens + lastStepUsage.outputTokens - : undefined; - input.emit( - doneEvent( - conversationId, - turnId, - finishReason, - turnDurationMs, - hasUsage ? totalUsage : undefined, - contextSize, - ), - ); - - return { messages: resultMessages, usage: totalUsage, finishReason }; + const messages: ChatMessage[] = [...input.messages]; + const resultMessages: ChatMessage[] = []; + let totalUsage = zeroUsage(); + let lastStepUsage: Usage | undefined; + let finishReason = "stop"; + + const toolMap = new Map<string, ToolContract>(); + for (const tool of input.tools) { + toolMap.set(tool.name, tool); + } + + const conversationId = input.conversationId; + const turnId = input.turnId; + const signal = input.signal ?? new AbortController().signal; + const logger = input.logger; + const now = input.now; + + // Record turn start time for durationMs on done + const turnStartMs = now !== undefined ? now() : undefined; + + // Open a turn span (attrs: conversationId, turnId, model) + let turnSpan: Span | undefined; + if (logger !== undefined) { + try { + turnSpan = logger.span("turn", { + conversationId, + turnId, + model: input.providerOpts?.model ?? input.provider.id, + }); + } catch { + // Swallow — D7. + } + } + + // Track open tool-call spans across steps so we can close them on abort + const toolSpans = new Map<string, Span>(); + + input.emit(turnStartEvent(conversationId, turnId)); + + try { + for (let step = 0; MAX_STEPS === 0 || step < MAX_STEPS; step++) { + if (signal.aborted) { + finishReason = "aborted"; + break; + } + + const stepId = `${turnId}#${step}` as StepId; + + const stepResult = await executeStep({ + provider: input.provider, + messages, + tools: input.tools, + toolMap, + dispatch: input.dispatch, + emit: input.emit, + signal, + conversationId, + turnId, + stepId, + logger: turnSpan?.log ?? logger ?? createNoopLogger(), + turnSpan, + toolSpans, + cwd: input.cwd, + computerId: input.computerId, + now, + providerOpts: input.providerOpts, + retry: input.retry, + }); + + totalUsage = addUsage(totalUsage, stepResult.usage); + lastStepUsage = stepResult.usage; + + // When the signal is aborted mid-step, the tool results are + // placeholders ({ content: "Aborted", isError: true }). If these + // are persisted and included in the next turn's message history, + // the provider sees a `tool` role message without a preceding + // `assistant` message carrying `tool_calls` → 400 error. + // + // To prevent this, when the signal is aborted we: + // 1. Strip tool-call chunks from the assistant message (keep + // text/thinking/error chunks so the partial response is + // preserved). + // 2. Omit tool-result messages entirely (they are not persisted, + // not added to resultMessages, and not passed to onStepComplete). + // + // This keeps the conversation history clean: the assistant's + // partial text is preserved, but no incomplete tool calls are + // left dangling. The `done` event still carries + // `reason: "aborted"`, so the turn seals cleanly. + const stepAborted = signal.aborted; + const assistantMessage = + stepAborted && stepResult.assistantMessage !== undefined + ? stripToolCallChunks(stepResult.assistantMessage) + : stepResult.assistantMessage; + const toolMessages = stepAborted ? [] : stepResult.toolMessages; + + if (assistantMessage !== undefined) { + messages.push(assistantMessage); + resultMessages.push(assistantMessage); + } + + for (const msg of toolMessages) { + messages.push(msg); + resultMessages.push(msg); + } + + // Incremental persistence: notify the caller that this step's + // messages are finalized. The caller can persist them immediately + // (assigning seq numbers during generation). The messages are the + // SAME objects in resultMessages — the caller must NOT double-persist. + if (input.onStepComplete !== undefined) { + const stepMessages: ChatMessage[] = []; + if (assistantMessage !== undefined) { + stepMessages.push(assistantMessage); + } + for (const msg of toolMessages) { + stepMessages.push(msg); + } + if (stepMessages.length > 0) { + await input.onStepComplete(stepMessages); + } + } + + if (stepAborted) { + finishReason = "aborted"; + break; + } + + if (stepResult.toolCalls.length === 0) { + finishReason = stepResult.finishReason; + break; + } + + if (MAX_STEPS > 0 && step === MAX_STEPS - 1) { + finishReason = "max-steps"; + // No next step → no tool-result boundary. Leave any pending + // steering messages for the caller (it owns the queue). + } else { + // Tool-result boundary: this step produced tool calls and we are + // about to call provider.stream again. Drain steering messages + // and append them after the tool results, before the next call. + // The kernel owns no queue and names no feature — it just calls + // the callback and appends. Emits nothing (caller emits the + // `steering` AgentEvent in its own wrapper). + const steering = input.drainSteering?.() ?? []; + for (const msg of steering) { + messages.push(msg); + } + } + } + } finally { + // Close any orphaned tool-call spans (e.g. abort mid-tool) + for (const [id, tcSpan] of toolSpans) { + try { + tcSpan.end({ attrs: { orphaned: true } }); + } catch { + // Swallow — D7. + } + toolSpans.delete(id); + } + + // Close the turn span + if (turnSpan !== undefined) { + try { + turnSpan.end({ + attrs: { + finishReason, + ...usageAttrs(totalUsage), + }, + }); + } catch { + // Swallow — D7. + } + } + } + + const turnDurationMs = + turnStartMs !== undefined && now !== undefined ? now() - turnStartMs : undefined; + const hasUsage = + totalUsage.inputTokens > 0 || + totalUsage.outputTokens > 0 || + totalUsage.cacheReadTokens !== undefined || + totalUsage.cacheWriteTokens !== undefined; + const contextSize = + hasUsage && lastStepUsage !== undefined + ? lastStepUsage.inputTokens + lastStepUsage.outputTokens + : undefined; + input.emit( + doneEvent( + conversationId, + turnId, + finishReason, + turnDurationMs, + hasUsage ? totalUsage : undefined, + contextSize, + ), + ); + + return { messages: resultMessages, usage: totalUsage, finishReason }; } function createNoopLogger(): Logger { - return { - debug() {}, - info() {}, - warn() {}, - error() {}, - child() { - return createNoopLogger(); - }, - span() { - return { - id: "noop", - log: createNoopLogger(), - setAttributes() {}, - addLink() {}, - child() { - return this; - }, - end() {}, - }; - }, - }; + return { + debug() {}, + info() {}, + warn() {}, + error() {}, + child() { + return createNoopLogger(); + }, + span() { + return { + id: "noop", + log: createNoopLogger(), + setAttributes() {}, + addLink() {}, + child() { + return this; + }, + end() {}, + }; + }, + }; } |
