summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel/src/runtime
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-06 21:29:52 +0900
committerAdam Malczewski <[email protected]>2026-06-06 21:29:52 +0900
commit2c5bc242a8a99e3b863c247f70b26f5883333677 (patch)
treece938fbef47d8dab44bfe5cce68ebfac7a91ca8a /packages/kernel/src/runtime
parentfedf9c2695476e9ee6f95776b0244acfc37f022f (diff)
downloaddispatch-2c5bc242a8a99e3b863c247f70b26f5883333677.tar.gz
dispatch-2c5bc242a8a99e3b863c247f70b26f5883333677.zip
feat(kernel-runtime,session-orchestrator): emit turn lifecycle events
Close a gap found live: neither transport emitted turn-start/done/turn-sealed (the wire defined them; nothing fired them). turn-sealed is the FE's cache-commit signal (frontend-design §6.3); done ends the stream. - kernel-runtime: runTurn emits turn-start first and done (with finishReason) last, on every exit path (stop/tool-calls/max-steps/error/aborted). - session-orchestrator: emits turn-sealed after conversationStore.append succeeds (the kernel touches no DB, so the post-persist seal is the orchestrator's). Not emitted if append throws. No contract change (all three wire types already existed). Verified live: HTTP /chat and WS chat both stream turn-start … done turn-sealed. typecheck clean, 494 vitest + 80 bun, biome clean.
Diffstat (limited to 'packages/kernel/src/runtime')
-rw-r--r--packages/kernel/src/runtime/events.ts8
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts209
-rw-r--r--packages/kernel/src/runtime/run-turn.ts6
3 files changed, 222 insertions, 1 deletions
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts
index 2a92008..a209b00 100644
--- a/packages/kernel/src/runtime/events.ts
+++ b/packages/kernel/src/runtime/events.ts
@@ -48,6 +48,14 @@ export function usageEvent(conversationId: string, turnId: string, usage: Usage)
return { type: "usage", conversationId, turnId, usage };
}
+export function turnStartEvent(conversationId: string, turnId: string): AgentEvent {
+ return { type: "turn-start", conversationId, turnId };
+}
+
+export function doneEvent(conversationId: string, turnId: string, reason: string): AgentEvent {
+ return { type: "done", conversationId, turnId, reason };
+}
+
export function errorEvent(
conversationId: string,
turnId: string,
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 48a6fbc..488a77e 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -119,7 +119,14 @@ describe("runTurn", () => {
expect(result.usage).toEqual({ inputTokens: 10, outputTokens: 5 });
const eventTypes = events.map((e) => e.type);
- expect(eventTypes).toEqual(["text-delta", "text-delta", "reasoning-delta", "usage"]);
+ expect(eventTypes).toEqual([
+ "turn-start",
+ "text-delta",
+ "text-delta",
+ "reasoning-delta",
+ "usage",
+ "done",
+ ]);
});
it("turn with one tool call executes tool, feeds result back, then finishes", async () => {
@@ -1482,4 +1489,204 @@ describe("runTurn", () => {
}
});
});
+
+ describe("lifecycle events", () => {
+ it("emits turn-start as the first event with conversation + turn ids", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-42",
+ turnId: "turn-99",
+ emit,
+ });
+
+ expect(events[0]?.type).toBe("turn-start");
+ if (events[0]?.type === "turn-start") {
+ expect(events[0].conversationId).toBe("conv-42");
+ expect(events[0].turnId).toBe("turn-99");
+ }
+ });
+
+ it("emits a single done event last, carrying the finishReason", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "Hello" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ const lastEvent = events[events.length - 1];
+ expect(lastEvent?.type).toBe("done");
+ if (lastEvent?.type === "done") {
+ expect(lastEvent.reason).toBe(result.finishReason);
+ expect(lastEvent.conversationId).toBe("conv-1");
+ expect(lastEvent.turnId).toBe("turn-1");
+ }
+
+ const doneEvents = events.filter((e) => e.type === "done");
+ expect(doneEvents).toHaveLength(1);
+ });
+
+ it("emits done after a tool-call turn", async () => {
+ const tool = createFakeTool("echo", async (input) => ({
+ content: `echo: ${JSON.stringify(input)}`,
+ }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: { x: 1 } },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ const lastEvent = events[events.length - 1];
+ expect(lastEvent?.type).toBe("done");
+ if (lastEvent?.type === "done") {
+ expect(lastEvent.reason).toBe(result.finishReason);
+ }
+ });
+
+ it('still emits done with reason "aborted" when the turn is aborted via signal', async () => {
+ const ac = new AbortController();
+ ac.abort();
+
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "should not appear" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ signal: ac.signal,
+ });
+
+ expect(result.finishReason).toBe("aborted");
+
+ const lastEvent = events[events.length - 1];
+ expect(lastEvent?.type).toBe("done");
+ if (lastEvent?.type === "done") {
+ expect(lastEvent.reason).toBe("aborted");
+ }
+ });
+
+ it('still emits done with reason "error" when the provider errors', async () => {
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ return (async function* () {
+ yield { type: "text-delta", delta: "partial" } as ProviderEvent;
+ throw new Error("provider crashed");
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ const result = await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ expect(result.finishReason).toBe("error");
+
+ const lastEvent = events[events.length - 1];
+ expect(lastEvent?.type).toBe("done");
+ if (lastEvent?.type === "done") {
+ expect(lastEvent.reason).toBe("error");
+ }
+ });
+
+ it("turn-start precedes every delta and done follows every delta", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "Hello" },
+ { type: "reasoning-delta", delta: "thinking..." },
+ { type: "text-delta", delta: " world" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ const turnStartIdx = events.findIndex((e) => e.type === "turn-start");
+ const doneIdx = events.findIndex((e) => e.type === "done");
+
+ expect(turnStartIdx).toBe(0);
+ expect(doneIdx).toBe(events.length - 1);
+
+ for (let i = 0; i < events.length; i++) {
+ const e = events[i];
+ if (e?.type === "text-delta" || e?.type === "reasoning-delta") {
+ expect(i).toBeGreaterThan(turnStartIdx);
+ expect(i).toBeLessThan(doneIdx);
+ }
+ }
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 6b07e24..1e98351 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -5,11 +5,13 @@ import type { EventEmitter, RunTurnInput, RunTurnResult } from "../contracts/run
import type { ToolCall, ToolContract } from "../contracts/tool.js";
import { createStepDispatcher, type StepDispatcher } from "./dispatch.js";
import {
+ doneEvent,
errorEvent,
reasoningDeltaEvent,
textDeltaEvent,
toolCallEvent,
toolResultEvent,
+ turnStartEvent,
usageEvent,
} from "./events.js";
@@ -346,6 +348,8 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
// Track open tool-call spans across steps so we can close them on abort
const toolSpans = new Map<string, Span>();
+ input.emit(turnStartEvent(conversationId, turnId));
+
try {
for (let step = 0; step < MAX_STEPS; step++) {
if (signal.aborted) {
@@ -422,6 +426,8 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
}
}
+ input.emit(doneEvent(conversationId, turnId, finishReason));
+
return { messages: resultMessages, usage: totalUsage, finishReason };
}