summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel/src/runtime
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-07 16:07:35 +0900
committerAdam Malczewski <[email protected]>2026-06-07 16:07:35 +0900
commit904c6d7cc882ea6e092f03f9f487d80b75426440 (patch)
tree0b97107a859f8d347071c01a6907c778dd9cd05a /packages/kernel/src/runtime
parentefddee1edd2924725a4dd240894666ede97b67b9 (diff)
downloaddispatch-904c6d7cc882ea6e092f03f9f487d80b75426440.tar.gz
dispatch-904c6d7cc882ea6e092f03f9f487d80b75426440.zip
feat(wire,kernel,conversation-store): step grouping via stepId for batched tool calls
Expose a per-step grouping key so a client can render a model's batched/parallel tool calls (those emitted in one step) as one unit, on both the live stream and replayed history. Key = branded StepId, derived turnId#stepIndex (0-based). - [email protected]: required stepId on Turn{Tool,ToolResult}Event; optional stepId on Tool{Call,Result}Chunk (generation provenance on the chunk, not the StoredChunk envelope — StoredChunk unchanged). [email protected] (re-export bump). - kernel-runtime: mint stepId per step; stamp on tool chunks + tool events. - conversation-store: chunk-carried stepId round-trips append/load/loadSince for free; reconcile copies it onto synthesized (interrupted) results. - cli: stepId added to event test fixtures (renderer unchanged). typecheck clean; 509 vitest + 89 bun; biome 0/0. FE courier reply + reference snapshots regenerated in ../dispatch-web.
Diffstat (limited to 'packages/kernel/src/runtime')
-rw-r--r--packages/kernel/src/runtime/events.ts16
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts136
-rw-r--r--packages/kernel/src/runtime/run-turn.ts10
3 files changed, 159 insertions, 3 deletions
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts
index a209b00..deeb012 100644
--- a/packages/kernel/src/runtime/events.ts
+++ b/packages/kernel/src/runtime/events.ts
@@ -1,3 +1,4 @@
+import type { StepId } from "../contracts/conversation.js";
import type { AgentEvent } from "../contracts/events.js";
import type { Usage } from "../contracts/provider.js";
@@ -16,22 +17,33 @@ export function reasoningDeltaEvent(
export function toolCallEvent(
conversationId: string,
turnId: string,
+ stepId: StepId,
toolCallId: string,
toolName: string,
input: unknown,
): AgentEvent {
- return { type: "tool-call", conversationId, turnId, toolCallId, toolName, input };
+ return { type: "tool-call", conversationId, turnId, stepId, toolCallId, toolName, input };
}
export function toolResultEvent(
conversationId: string,
turnId: string,
+ stepId: StepId,
toolCallId: string,
toolName: string,
content: string,
isError: boolean,
): AgentEvent {
- return { type: "tool-result", conversationId, turnId, toolCallId, toolName, content, isError };
+ return {
+ type: "tool-result",
+ conversationId,
+ turnId,
+ stepId,
+ toolCallId,
+ toolName,
+ content,
+ isError,
+ };
}
export function toolOutputEvent(
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 488a77e..42a846b 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -1689,4 +1689,140 @@ describe("runTurn", () => {
}
});
});
+
+ describe("stepId", () => {
+ it("tool-call and tool-result events carry stepId", async () => {
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ const toolCallEvt = events.find((e) => e.type === "tool-call");
+ const toolResultEvt = events.find((e) => e.type === "tool-result");
+
+ expect(toolCallEvt).toBeDefined();
+ expect(toolResultEvt).toBeDefined();
+
+ if (toolCallEvt?.type === "tool-call" && toolResultEvt?.type === "tool-result") {
+ expect(toolCallEvt.stepId).toBeDefined();
+ expect(toolResultEvt.stepId).toBeDefined();
+ expect(toolCallEvt.stepId).toBe(toolResultEvt.stepId);
+ }
+ });
+
+ it("tool calls in the SAME step share one stepId; a later step gets a different one", async () => {
+ const toolA = createFakeTool("a", async () => ({ content: "a-result" }));
+ const toolB = createFakeTool("b", async () => ({ content: "b-result" }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "a", input: {} },
+ { type: "tool-call", toolCallId: "tc2", toolName: "b", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "tool-call", toolCallId: "tc3", toolName: "a", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [toolA, toolB],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ const toolCallEvts = events.filter((e) => e.type === "tool-call");
+ expect(toolCallEvts.length).toBeGreaterThanOrEqual(2);
+
+ const step0Calls = toolCallEvts.filter(
+ (e) => e.type === "tool-call" && (e.toolCallId === "tc1" || e.toolCallId === "tc2"),
+ );
+ const step1Call = toolCallEvts.find((e) => e.type === "tool-call" && e.toolCallId === "tc3");
+
+ expect(step0Calls).toHaveLength(2);
+ if (step0Calls[0]?.type === "tool-call" && step0Calls[1]?.type === "tool-call") {
+ expect(step0Calls[0].stepId).toBe(step0Calls[1].stepId);
+ }
+
+ if (step1Call?.type === "tool-call" && step0Calls[0]?.type === "tool-call") {
+ expect(step1Call.stepId).not.toBe(step0Calls[0].stepId);
+ }
+ });
+
+ it("tool chunks in the result carry stepId", async () => {
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ });
+
+ const toolCallMsg = result.messages.find(
+ (m) => m.role === "assistant" && m.chunks.some((c) => c.type === "tool-call"),
+ );
+ const toolResultMsg = result.messages.find((m) => m.role === "tool");
+
+ expect(toolCallMsg).toBeDefined();
+ expect(toolResultMsg).toBeDefined();
+
+ const tcChunk = toolCallMsg?.chunks.find((c) => c.type === "tool-call");
+ const trChunk = toolResultMsg?.chunks[0];
+
+ expect(tcChunk?.type).toBe("tool-call");
+ expect(trChunk?.type).toBe("tool-result");
+
+ if (tcChunk?.type === "tool-call" && trChunk?.type === "tool-result") {
+ expect(tcChunk.stepId).toBeDefined();
+ expect(trChunk.stepId).toBeDefined();
+ expect(tcChunk.stepId).toBe(trChunk.stepId);
+ }
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 1e98351..b722f3f 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -1,4 +1,4 @@
-import type { ChatMessage, Chunk } from "../contracts/conversation.js";
+import type { ChatMessage, Chunk, StepId } from "../contracts/conversation.js";
import type { Logger, Span } from "../contracts/logging.js";
import type { ProviderContract, ProviderEvent, Usage } from "../contracts/provider.js";
import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/runtime.js";
@@ -79,6 +79,7 @@ interface StepContext {
readonly signal: AbortSignal;
readonly conversationId: string;
readonly turnId: string;
+ readonly stepId: StepId;
readonly logger: Logger;
readonly turnSpan: Span | undefined;
readonly toolSpans: Map<string, Span>;
@@ -122,11 +123,13 @@ function processEvent(
toolCallId: event.toolCallId,
toolName: event.toolName,
input: event.input,
+ stepId: ctx.stepId,
});
ctx.emit(
toolCallEvent(
ctx.conversationId,
ctx.turnId,
+ ctx.stepId,
event.toolCallId,
event.toolName,
event.input,
@@ -273,6 +276,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
toolResultEvent(
ctx.conversationId,
ctx.turnId,
+ ctx.stepId,
call.id,
call.name,
result.content,
@@ -288,6 +292,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
toolName: call.name,
content: result.content,
isError,
+ stepId: ctx.stepId,
},
],
});
@@ -357,6 +362,8 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
break;
}
+ const stepId = `${turnId}#${step}` as StepId;
+
const stepResult = await executeStep({
provider: input.provider,
messages,
@@ -367,6 +374,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
signal,
conversationId,
turnId,
+ stepId,
logger: turnSpan?.log ?? logger ?? createNoopLogger(),
turnSpan,
toolSpans,