diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 16:07:35 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 16:07:35 +0900 |
| commit | 904c6d7cc882ea6e092f03f9f487d80b75426440 (patch) | |
| tree | 0b97107a859f8d347071c01a6907c778dd9cd05a /packages/kernel/src/runtime | |
| parent | efddee1edd2924725a4dd240894666ede97b67b9 (diff) | |
| download | dispatch-904c6d7cc882ea6e092f03f9f487d80b75426440.tar.gz dispatch-904c6d7cc882ea6e092f03f9f487d80b75426440.zip | |
feat(wire,kernel,conversation-store): step grouping via stepId for batched tool calls
Expose a per-step grouping key so a client can render a model's batched/parallel
tool calls (those emitted in one step) as one unit, on both the live stream and
replayed history. Key = branded StepId, derived turnId#stepIndex (0-based).
- [email protected]: required stepId on Turn{Tool,ToolResult}Event; optional stepId on
Tool{Call,Result}Chunk (generation provenance on the chunk, not the StoredChunk
envelope — StoredChunk unchanged). [email protected] (re-export bump).
- kernel-runtime: mint stepId per step; stamp on tool chunks + tool events.
- conversation-store: chunk-carried stepId round-trips append/load/loadSince for
free; reconcile copies it onto synthesized (interrupted) results.
- cli: stepId added to event test fixtures (renderer unchanged).
typecheck clean; 509 vitest + 89 bun; biome 0/0. FE courier reply + reference
snapshots regenerated in ../dispatch-web.
Diffstat (limited to 'packages/kernel/src/runtime')
| -rw-r--r-- | packages/kernel/src/runtime/events.ts | 16 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 136 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 10 |
3 files changed, 159 insertions, 3 deletions
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index a209b00..deeb012 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -1,3 +1,4 @@ +import type { StepId } from "../contracts/conversation.js"; import type { AgentEvent } from "../contracts/events.js"; import type { Usage } from "../contracts/provider.js"; @@ -16,22 +17,33 @@ export function reasoningDeltaEvent( export function toolCallEvent( conversationId: string, turnId: string, + stepId: StepId, toolCallId: string, toolName: string, input: unknown, ): AgentEvent { - return { type: "tool-call", conversationId, turnId, toolCallId, toolName, input }; + return { type: "tool-call", conversationId, turnId, stepId, toolCallId, toolName, input }; } export function toolResultEvent( conversationId: string, turnId: string, + stepId: StepId, toolCallId: string, toolName: string, content: string, isError: boolean, ): AgentEvent { - return { type: "tool-result", conversationId, turnId, toolCallId, toolName, content, isError }; + return { + type: "tool-result", + conversationId, + turnId, + stepId, + toolCallId, + toolName, + content, + isError, + }; } export function toolOutputEvent( diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 488a77e..42a846b 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -1689,4 +1689,140 @@ describe("runTurn", () => { } }); }); + + describe("stepId", () => { + it("tool-call and tool-result events carry stepId", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const toolCallEvt = events.find((e) => e.type === "tool-call"); + const toolResultEvt = events.find((e) => e.type === "tool-result"); + + expect(toolCallEvt).toBeDefined(); + expect(toolResultEvt).toBeDefined(); + + if (toolCallEvt?.type === "tool-call" && toolResultEvt?.type === "tool-result") { + expect(toolCallEvt.stepId).toBeDefined(); + expect(toolResultEvt.stepId).toBeDefined(); + expect(toolCallEvt.stepId).toBe(toolResultEvt.stepId); + } + }); + + it("tool calls in the SAME step share one stepId; a later step gets a different one", async () => { + const toolA = createFakeTool("a", async () => ({ content: "a-result" })); + const toolB = createFakeTool("b", async () => ({ content: "b-result" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} }, + { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "tool-call", toolCallId: "tc3", toolName: "a", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [toolA, toolB], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const toolCallEvts = events.filter((e) => e.type === "tool-call"); + expect(toolCallEvts.length).toBeGreaterThanOrEqual(2); + + const step0Calls = toolCallEvts.filter( + (e) => e.type === "tool-call" && (e.toolCallId === "tc1" || e.toolCallId === "tc2"), + ); + const step1Call = toolCallEvts.find((e) => e.type === "tool-call" && e.toolCallId === "tc3"); + + expect(step0Calls).toHaveLength(2); + if (step0Calls[0]?.type === "tool-call" && step0Calls[1]?.type === "tool-call") { + expect(step0Calls[0].stepId).toBe(step0Calls[1].stepId); + } + + if (step1Call?.type === "tool-call" && step0Calls[0]?.type === "tool-call") { + expect(step1Call.stepId).not.toBe(step0Calls[0].stepId); + } + }); + + it("tool chunks in the result carry stepId", async () => { + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + }); + + const toolCallMsg = result.messages.find( + (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"), + ); + const toolResultMsg = result.messages.find((m) => m.role === "tool"); + + expect(toolCallMsg).toBeDefined(); + expect(toolResultMsg).toBeDefined(); + + const tcChunk = toolCallMsg?.chunks.find((c) => c.type === "tool-call"); + const trChunk = toolResultMsg?.chunks[0]; + + expect(tcChunk?.type).toBe("tool-call"); + expect(trChunk?.type).toBe("tool-result"); + + if (tcChunk?.type === "tool-call" && trChunk?.type === "tool-result") { + expect(tcChunk.stepId).toBeDefined(); + expect(trChunk.stepId).toBeDefined(); + expect(tcChunk.stepId).toBe(trChunk.stepId); + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 1e98351..b722f3f 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -1,4 +1,4 @@ -import type { ChatMessage, Chunk } from "../contracts/conversation.js"; +import type { ChatMessage, Chunk, StepId } from "../contracts/conversation.js"; import type { Logger, Span } from "../contracts/logging.js"; import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js"; import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js"; @@ -79,6 +79,7 @@ interface StepContext { readonly signal: AbortSignal; readonly conversationId: string; readonly turnId: string; + readonly stepId: StepId; readonly logger: Logger; readonly turnSpan: Span | undefined; readonly toolSpans: Map<string, Span>; @@ -122,11 +123,13 @@ function processEvent( toolCallId: event.toolCallId, toolName: event.toolName, input: event.input, + stepId: ctx.stepId, }); ctx.emit( toolCallEvent( ctx.conversationId, ctx.turnId, + ctx.stepId, event.toolCallId, event.toolName, event.input, @@ -273,6 +276,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { toolResultEvent( ctx.conversationId, ctx.turnId, + ctx.stepId, call.id, call.name, result.content, @@ -288,6 +292,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { toolName: call.name, content: result.content, isError, + stepId: ctx.stepId, }, ], }); @@ -357,6 +362,8 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { break; } + const stepId = `${turnId}#${step}` as StepId; + const stepResult = await executeStep({ provider: input.provider, messages, @@ -367,6 +374,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { signal, conversationId, turnId, + stepId, logger: turnSpan?.log ?? logger ?? createNoopLogger(), turnSpan, toolSpans, |
