diff options
| author | Adam Malczewski <[email protected]> | 2026-06-06 21:29:52 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-06 21:29:52 +0900 |
| commit | 2c5bc242a8a99e3b863c247f70b26f5883333677 (patch) | |
| tree | ce938fbef47d8dab44bfe5cce68ebfac7a91ca8a /packages/kernel/src/runtime | |
| parent | fedf9c2695476e9ee6f95776b0244acfc37f022f (diff) | |
| download | dispatch-2c5bc242a8a99e3b863c247f70b26f5883333677.tar.gz dispatch-2c5bc242a8a99e3b863c247f70b26f5883333677.zip | |
feat(kernel-runtime,session-orchestrator): emit turn lifecycle events
Close a gap found live: neither transport emitted turn-start/done/turn-sealed
(the wire defined them; nothing fired them). turn-sealed is the FE's
cache-commit signal (frontend-design §6.3); done ends the stream.
- kernel-runtime: runTurn emits turn-start first and done (with finishReason)
last, on every exit path (stop/tool-calls/max-steps/error/aborted).
- session-orchestrator: emits turn-sealed after conversationStore.append
succeeds (the kernel touches no DB, so the post-persist seal is the
orchestrator's). Not emitted if append throws.
No contract change (all three wire types already existed). Verified live: HTTP
/chat and WS chat both stream turn-start … done turn-sealed.
typecheck clean, 494 vitest + 80 bun, biome clean.
Diffstat (limited to 'packages/kernel/src/runtime')
| -rw-r--r-- | packages/kernel/src/runtime/events.ts | 8 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 209 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 6 |
3 files changed, 222 insertions, 1 deletions
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts index 2a92008..a209b00 100644 --- a/packages/kernel/src/runtime/events.ts +++ b/packages/kernel/src/runtime/events.ts @@ -48,6 +48,14 @@ export function usageEvent(conversationId: string, turnId: string, usage: Usage) return { type: "usage", conversationId, turnId, usage }; } +export function turnStartEvent(conversationId: string, turnId: string): AgentEvent { + return { type: "turn-start", conversationId, turnId }; +} + +export function doneEvent(conversationId: string, turnId: string, reason: string): AgentEvent { + return { type: "done", conversationId, turnId, reason }; +} + export function errorEvent( conversationId: string, turnId: string, diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index 48a6fbc..488a77e 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -119,7 +119,14 @@ describe("runTurn", () => { expect(result.usage).toEqual({ inputTokens: 10, outputTokens: 5 }); const eventTypes = events.map((e) => e.type); - expect(eventTypes).toEqual(["text-delta", "text-delta", "reasoning-delta", "usage"]); + expect(eventTypes).toEqual([ + "turn-start", + "text-delta", + "text-delta", + "reasoning-delta", + "usage", + "done", + ]); }); it("turn with one tool call executes tool, feeds result back, then finishes", async () => { @@ -1482,4 +1489,204 @@ describe("runTurn", () => { } }); }); + + describe("lifecycle events", () => { + it("emits turn-start as the first event with conversation + turn ids", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-42", + turnId: "turn-99", + emit, + }); + + expect(events[0]?.type).toBe("turn-start"); + if (events[0]?.type === "turn-start") { + expect(events[0].conversationId).toBe("conv-42"); + expect(events[0].turnId).toBe("turn-99"); + } + }); + + it("emits a single done event last, carrying the finishReason", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe(result.finishReason); + expect(lastEvent.conversationId).toBe("conv-1"); + expect(lastEvent.turnId).toBe("turn-1"); + } + + const doneEvents = events.filter((e) => e.type === "done"); + expect(doneEvents).toHaveLength(1); + }); + + it("emits done after a tool-call turn", async () => { + const tool = createFakeTool("echo", async (input) => ({ + content: `echo: ${JSON.stringify(input)}`, + })); + + const provider = createFakeProvider([ + [ + { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: { x: 1 } }, + { type: "finish", reason: "tool-calls" }, + ], + [ + { type: "text-delta", delta: "done" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [tool], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe(result.finishReason); + } + }); + + it('still emits done with reason "aborted" when the turn is aborted via signal', async () => { + const ac = new AbortController(); + ac.abort(); + + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "should not appear" }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + signal: ac.signal, + }); + + expect(result.finishReason).toBe("aborted"); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe("aborted"); + } + }); + + it('still emits done with reason "error" when the provider errors', async () => { + const provider: ProviderContract = { + id: "fake", + stream() { + return (async function* () { + yield { type: "text-delta", delta: "partial" } as ProviderEvent; + throw new Error("provider crashed"); + })(); + }, + }; + + const { events, emit } = createCollectingEmit(); + + const result = await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + expect(result.finishReason).toBe("error"); + + const lastEvent = events[events.length - 1]; + expect(lastEvent?.type).toBe("done"); + if (lastEvent?.type === "done") { + expect(lastEvent.reason).toBe("error"); + } + }); + + it("turn-start precedes every delta and done follows every delta", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "reasoning-delta", delta: "thinking..." }, + { type: "text-delta", delta: " world" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { events, emit } = createCollectingEmit(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit, + }); + + const turnStartIdx = events.findIndex((e) => e.type === "turn-start"); + const doneIdx = events.findIndex((e) => e.type === "done"); + + expect(turnStartIdx).toBe(0); + expect(doneIdx).toBe(events.length - 1); + + for (let i = 0; i < events.length; i++) { + const e = events[i]; + if (e?.type === "text-delta" || e?.type === "reasoning-delta") { + expect(i).toBeGreaterThan(turnStartIdx); + expect(i).toBeLessThan(doneIdx); + } + } + }); + }); }); diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 6b07e24..1e98351 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -5,11 +5,13 @@ import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/run import type { ToolCall, ToolContract } from "../contracts/tool.js"; import { createStepDispatcher, type StepDispatcher } from "./dispatch.js"; import { + doneEvent, errorEvent, reasoningDeltaEvent, textDeltaEvent, toolCallEvent, toolResultEvent, + turnStartEvent, usageEvent, } from "./events.js"; @@ -346,6 +348,8 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { // Track open tool-call spans across steps so we can close them on abort const toolSpans = new Map<string, Span>(); + input.emit(turnStartEvent(conversationId, turnId)); + try { for (let step = 0; step < MAX_STEPS; step++) { if (signal.aborted) { @@ -422,6 +426,8 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { } } + input.emit(doneEvent(conversationId, turnId, finishReason)); + return { messages: resultMessages, usage: totalUsage, finishReason }; } |
