summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel/src/runtime
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-07 17:46:53 +0900
committerAdam Malczewski <[email protected]>2026-06-07 17:46:53 +0900
commit7c459c7d919d1e08a228e8abc56129be174d8abe (patch)
tree93011125c001945723ac9b9358c4ddd450f87f72 /packages/kernel/src/runtime
parent5746cf4e545cd5b0d7faf0595554f273f236f3a9 (diff)
downloaddispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.tar.gz
dispatch-7c459c7d919d1e08a228e8abc56129be174d8abe.zip
feat(wire,kernel,session-orchestrator): live turn metrics on the stream
Expose the backend's authoritative token+timing metrics on the live AgentEvent stream (observability-only -> now also client-facing). All additive/optional. - [email protected]: new TurnStepCompleteEvent (type:step-complete) with per-step ttftMs/decodeMs/genTotalMs; usage += stepId; tool-result += durationMs (exec); done += durationMs (turn wall-clock) + usage (turn total). RunTurnInput += now?. [email protected] (re-export bump). - kernel-runtime: when now injected, measures + emits the above (reuses the ttft/decode first-token detection); omits timing gracefully without a clock. - session-orchestrator: adds now? to deps, threads into RunTurnInput; extension activate injects () => Date.now(). - transport/cli/host-bin: untouched (verbatim pass-through; additive fields). FE handoff: frontend-metrics-handoff.md. typecheck clean; 520 vitest + 89 bun; biome 0/0. Replay/persistence = deferred Pass 2 (documented in tasks.md).
Diffstat (limited to 'packages/kernel/src/runtime')
-rw-r--r--packages/kernel/src/runtime/events.ts80
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts330
-rw-r--r--packages/kernel/src/runtime/run-turn.ts65
3 files changed, 468 insertions, 7 deletions
diff --git a/packages/kernel/src/runtime/events.ts b/packages/kernel/src/runtime/events.ts
index deeb012..300e711 100644
--- a/packages/kernel/src/runtime/events.ts
+++ b/packages/kernel/src/runtime/events.ts
@@ -33,9 +33,10 @@ export function toolResultEvent(
toolName: string,
content: string,
isError: boolean,
+ durationMs?: number,
): AgentEvent {
- return {
- type: "tool-result",
+ const base = {
+ type: "tool-result" as const,
conversationId,
turnId,
stepId,
@@ -44,6 +45,10 @@ export function toolResultEvent(
content,
isError,
};
+ if (durationMs !== undefined) {
+ return { ...base, durationMs };
+ }
+ return base;
}
export function toolOutputEvent(
@@ -56,7 +61,15 @@ export function toolOutputEvent(
return { type: "tool-output", conversationId, turnId, toolCallId, data, stream };
}
-export function usageEvent(conversationId: string, turnId: string, usage: Usage): AgentEvent {
+export function usageEvent(
+ conversationId: string,
+ turnId: string,
+ usage: Usage,
+ stepId?: StepId,
+): AgentEvent {
+ if (stepId !== undefined) {
+ return { type: "usage", conversationId, turnId, usage, stepId };
+ }
return { type: "usage", conversationId, turnId, usage };
}
@@ -64,7 +77,66 @@ export function turnStartEvent(conversationId: string, turnId: string): AgentEve
return { type: "turn-start", conversationId, turnId };
}
-export function doneEvent(conversationId: string, turnId: string, reason: string): AgentEvent {
+export function stepCompleteEvent(
+ conversationId: string,
+ turnId: string,
+ stepId: StepId,
+ timing?: { ttftMs?: number; decodeMs?: number; genTotalMs?: number },
+): AgentEvent {
+ if (timing !== undefined) {
+ if (timing.ttftMs !== undefined) {
+ if (timing.decodeMs !== undefined && timing.genTotalMs !== undefined) {
+ return {
+ type: "step-complete",
+ conversationId,
+ turnId,
+ stepId,
+ ttftMs: timing.ttftMs,
+ decodeMs: timing.decodeMs,
+ genTotalMs: timing.genTotalMs,
+ };
+ }
+ if (timing.genTotalMs !== undefined) {
+ return {
+ type: "step-complete",
+ conversationId,
+ turnId,
+ stepId,
+ ttftMs: timing.ttftMs,
+ genTotalMs: timing.genTotalMs,
+ };
+ }
+ return { type: "step-complete", conversationId, turnId, stepId, ttftMs: timing.ttftMs };
+ }
+ if (timing.genTotalMs !== undefined) {
+ return {
+ type: "step-complete",
+ conversationId,
+ turnId,
+ stepId,
+ genTotalMs: timing.genTotalMs,
+ };
+ }
+ }
+ return { type: "step-complete", conversationId, turnId, stepId };
+}
+
+export function doneEvent(
+ conversationId: string,
+ turnId: string,
+ reason: string,
+ durationMs?: number,
+ usage?: Usage,
+): AgentEvent {
+ if (durationMs !== undefined && usage !== undefined) {
+ return { type: "done", conversationId, turnId, reason, durationMs, usage };
+ }
+ if (durationMs !== undefined) {
+ return { type: "done", conversationId, turnId, reason, durationMs };
+ }
+ if (usage !== undefined) {
+ return { type: "done", conversationId, turnId, reason, usage };
+ }
return { type: "done", conversationId, turnId, reason };
}
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index 089f65b..ce654d5 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -125,6 +125,7 @@ describe("runTurn", () => {
"text-delta",
"reasoning-delta",
"usage",
+ "step-complete",
"done",
]);
});
@@ -1970,4 +1971,333 @@ describe("runTurn", () => {
}
});
});
+
+ describe("timing events (now provided)", () => {
+ function createCounterNow(): { now: () => number; tick: (ms: number) => void } {
+ let current = 0;
+ return {
+ now: () => current,
+ tick: (ms: number) => {
+ current += ms;
+ },
+ };
+ }
+
+ it("emits step-complete per step with timing when now provided", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const { events, emit } = createCollectingEmit();
+
+ // Advance clock during stream: first token at +50ms, stream ends at +200ms
+ let streamCallCount = 0;
+ const wrappedProvider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools) {
+ const idx = streamCallCount++;
+ return (async function* () {
+ if (idx === 0) {
+ clock.tick(50); // stream starts
+ yield { type: "text-delta", delta: "Hello" } as ProviderEvent;
+ // first token seen at 150 (100+50)
+ clock.tick(100);
+ yield { type: "text-delta", delta: " world" } as ProviderEvent;
+ clock.tick(50);
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ await runTurn({
+ provider: wrappedProvider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const stepCompleteEvts = events.filter((e) => e.type === "step-complete");
+ expect(stepCompleteEvts).toHaveLength(1);
+
+ const sc = stepCompleteEvts[0];
+ if (sc?.type === "step-complete") {
+ expect(sc.conversationId).toBe("conv-1");
+ expect(sc.turnId).toBe("turn-1");
+ expect(sc.stepId).toBeDefined();
+ expect(sc.genTotalMs).toBe(200); // 50+100+50
+ expect(sc.ttftMs).toBe(50); // stream start → first text-delta
+ expect(sc.decodeMs).toBe(150); // first token → stream end
+ const ttft = sc.ttftMs;
+ const decode = sc.decodeMs;
+ const genTotal = sc.genTotalMs;
+ if (ttft !== undefined && decode !== undefined && genTotal !== undefined) {
+ expect(genTotal).toBe(ttft + decode);
+ }
+ }
+ });
+
+ it("step-complete omits ttft/decode but keeps genTotalMs for a no-content step", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ let streamCallCount = 0;
+ const wrappedProvider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools) {
+ const idx = streamCallCount++;
+ return (async function* () {
+ if (idx === 0) {
+ clock.tick(80); // stream starts at 180
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "echo",
+ input: {},
+ } as ProviderEvent;
+ clock.tick(20);
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ } else {
+ clock.tick(50);
+ yield { type: "text-delta", delta: "done" } as ProviderEvent;
+ clock.tick(50);
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider: wrappedProvider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const stepCompleteEvts = events.filter((e) => e.type === "step-complete");
+ expect(stepCompleteEvts).toHaveLength(2);
+
+ // First step: tool-call-only, no content token
+ const sc0 = stepCompleteEvts[0];
+ if (sc0?.type === "step-complete") {
+ expect(sc0.stepId).toBeDefined();
+ expect(sc0.genTotalMs).toBe(100); // 80+20
+ expect(sc0.ttftMs).toBeUndefined();
+ expect(sc0.decodeMs).toBeUndefined();
+ }
+
+ // Second step: has text-delta
+ const sc1 = stepCompleteEvts[1];
+ if (sc1?.type === "step-complete") {
+ expect(sc1.stepId).toBeDefined();
+ expect(sc1.genTotalMs).toBe(100); // 50+50
+ expect(sc1.ttftMs).toBe(50);
+ expect(sc1.decodeMs).toBe(50);
+ }
+ });
+
+ it("usage event carries stepId", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ });
+
+ const usageEvts = events.filter((e) => e.type === "usage");
+ expect(usageEvts).toHaveLength(1);
+ const ue = usageEvts[0];
+ if (ue?.type === "usage") {
+ expect(ue.stepId).toBeDefined();
+ }
+ });
+
+ it("tool-result carries durationMs (execution time) when now provided", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const tool = createFakeTool("slow", async () => {
+ clock.tick(200); // tool takes 200ms to execute
+ return { content: "done" };
+ });
+
+ const provider = createFakeProvider([
+ [
+ { type: "tool-call", toolCallId: "tc1", toolName: "slow", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "ok" },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const toolResultEvts = events.filter((e) => e.type === "tool-result");
+ expect(toolResultEvts).toHaveLength(1);
+ const tr = toolResultEvts[0];
+ if (tr?.type === "tool-result") {
+ expect(tr.durationMs).toBeDefined();
+ expect(tr.durationMs).toBe(200);
+ }
+ });
+
+ it("done carries durationMs and aggregate usage when now provided", async () => {
+ const clock = createCounterNow();
+ clock.tick(100); // turn starts at 100
+
+ const wrappedProvider: ProviderContract = {
+ id: "fake",
+ stream(_messages, _tools) {
+ return (async function* () {
+ clock.tick(80); // stream duration
+ yield { type: "text-delta", delta: "hi" } as ProviderEvent;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ })();
+ },
+ };
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider: wrappedProvider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ now: clock.now,
+ });
+
+ const doneEvts = events.filter((e) => e.type === "done");
+ expect(doneEvts).toHaveLength(1);
+ const d = doneEvts[0];
+ if (d?.type === "done") {
+ expect(d.durationMs).toBeDefined();
+ expect(d.durationMs).toBeGreaterThan(0);
+ expect(d.usage).toBeDefined();
+ if (d.usage !== undefined) {
+ expect(d.usage.inputTokens).toBe(10);
+ expect(d.usage.outputTokens).toBe(5);
+ }
+ }
+ });
+
+ it("no now → timing fields absent", async () => {
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "tool-call", toolCallId: "tc1", toolName: "echo", input: {} },
+ { type: "finish", reason: "tool-calls" },
+ ],
+ [
+ { type: "text-delta", delta: "done" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { events, emit } = createCollectingEmit();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [tool],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit,
+ // no now
+ });
+
+ // step-complete still emitted (with stepId, no timing)
+ const stepCompleteEvts = events.filter((e) => e.type === "step-complete");
+ expect(stepCompleteEvts).toHaveLength(2);
+ for (const sc of stepCompleteEvts) {
+ if (sc?.type === "step-complete") {
+ expect(sc.stepId).toBeDefined();
+ expect(sc.ttftMs).toBeUndefined();
+ expect(sc.decodeMs).toBeUndefined();
+ expect(sc.genTotalMs).toBeUndefined();
+ }
+ }
+
+ // usage still carries stepId
+ const usageEvts = events.filter((e) => e.type === "usage");
+ for (const ue of usageEvts) {
+ if (ue?.type === "usage") {
+ expect(ue.stepId).toBeDefined();
+ }
+ }
+
+ // no durationMs on tool-result
+ const toolResultEvts = events.filter((e) => e.type === "tool-result");
+ for (const tr of toolResultEvts) {
+ if (tr?.type === "tool-result") {
+ expect(tr.durationMs).toBeUndefined();
+ }
+ }
+
+ // no durationMs on done, but usage is present (independent of now)
+ const doneEvts = events.filter((e) => e.type === "done");
+ expect(doneEvts).toHaveLength(1);
+ const d = doneEvts[0];
+ if (d?.type === "done") {
+ expect(d.durationMs).toBeUndefined();
+ expect(d.usage).toBeDefined();
+ if (d.usage !== undefined) {
+ expect(d.usage.inputTokens).toBe(15);
+ expect(d.usage.outputTokens).toBe(8);
+ }
+ }
+ });
+ });
});
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index a8ee6c9..06069a2 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -8,6 +8,7 @@ import {
doneEvent,
errorEvent,
reasoningDeltaEvent,
+ stepCompleteEvent,
textDeltaEvent,
toolCallEvent,
toolResultEvent,
@@ -84,12 +85,15 @@ interface StepContext {
readonly turnSpan: Span | undefined;
readonly toolSpans: Map<string, Span>;
readonly cwd: string | undefined;
+ readonly now: (() => number) | undefined;
}
interface TimingState {
ttftSpan: Span | undefined;
decodeSpan: Span | undefined;
firstTokenSeen: boolean;
+ streamStartMs: number | undefined;
+ firstTokenMs: number | undefined;
}
interface StepResult {
@@ -108,11 +112,15 @@ function processEvent(
ctx: StepContext,
stepSpan: Span | undefined,
timing: TimingState,
+ toolDispatchTimes: Map<string, number>,
): void {
switch (event.type) {
case "text-delta":
if (!timing.firstTokenSeen) {
timing.firstTokenSeen = true;
+ if (ctx.now !== undefined) {
+ timing.firstTokenMs = ctx.now();
+ }
try {
timing.ttftSpan?.end({ attrs: { firstToken: true } });
} catch {
@@ -131,6 +139,9 @@ function processEvent(
case "reasoning-delta":
if (!timing.firstTokenSeen) {
timing.firstTokenSeen = true;
+ if (ctx.now !== undefined) {
+ timing.firstTokenMs = ctx.now();
+ }
try {
timing.ttftSpan?.end({ attrs: { firstToken: true } });
} catch {
@@ -171,6 +182,11 @@ function processEvent(
),
);
+ // Capture dispatch time for tool-call durationMs
+ if (ctx.now !== undefined) {
+ toolDispatchTimes.set(event.toolCallId, ctx.now());
+ }
+
// Open a tool-call span as a child of the step span (attrs: name, toolCallId)
try {
const tcSpan =
@@ -194,7 +210,7 @@ function processEvent(
break;
}
case "usage":
- ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage));
+ ctx.emit(usageEvent(ctx.conversationId, ctx.turnId, event.usage, ctx.stepId));
break;
case "finish":
break;
@@ -212,6 +228,7 @@ function processEvent(
async function executeStep(ctx: StepContext): Promise<StepResult> {
const chunks: Chunk[] = [];
const toolCalls: ToolCall[] = [];
+ const toolDispatchTimes = new Map<string, number>();
let stepUsage = zeroUsage();
let finishReason = "stop";
@@ -250,6 +267,8 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
ttftSpan: undefined,
decodeSpan: undefined,
firstTokenSeen: false,
+ streamStartMs: ctx.now !== undefined ? ctx.now() : undefined,
+ firstTokenMs: undefined,
};
// Open TTFT span when spans are enabled
@@ -268,7 +287,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const stream = ctx.provider.stream(ctx.messages, ctx.tools, opts);
for await (const event of stream) {
if (ctx.signal.aborted) break;
- processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing);
+ processEvent(event, chunks, toolCalls, dispatcher, ctx, stepSpan, timing, toolDispatchTimes);
if (event.type === "usage") {
stepUsage = addUsage(stepUsage, event.usage);
}
@@ -305,6 +324,22 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
// Swallow — D7.
}
+ // Emit step-complete event with timing
+ const streamEndMs = ctx.now !== undefined ? ctx.now() : undefined;
+ if (timing.streamStartMs !== undefined && streamEndMs !== undefined) {
+ const genTotalMs = streamEndMs - timing.streamStartMs;
+ const stepTiming: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = {
+ genTotalMs,
+ };
+ if (timing.firstTokenMs !== undefined) {
+ stepTiming.ttftMs = timing.firstTokenMs - timing.streamStartMs;
+ stepTiming.decodeMs = streamEndMs - timing.firstTokenMs;
+ }
+ ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId, stepTiming));
+ } else {
+ ctx.emit(stepCompleteEvent(ctx.conversationId, ctx.turnId, ctx.stepId));
+ }
+
if (!ctx.dispatch.eager) {
for (const call of toolCalls) {
dispatcher.submit(call);
@@ -337,6 +372,9 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
const result = results.get(call.id);
if (result !== undefined) {
const isError = result.isError ?? false;
+ const dispatchTime = toolDispatchTimes.get(call.id);
+ const toolDurationMs =
+ ctx.now !== undefined && dispatchTime !== undefined ? ctx.now() - dispatchTime : undefined;
ctx.emit(
toolResultEvent(
ctx.conversationId,
@@ -346,6 +384,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
call.name,
result.content,
isError,
+ toolDurationMs,
),
);
toolMessages.push({
@@ -400,6 +439,10 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
const turnId = input.turnId;
const signal = input.signal ?? new AbortController().signal;
const logger = input.logger;
+ const now = input.now;
+
+ // Record turn start time for durationMs on done
+ const turnStartMs = now !== undefined ? now() : undefined;
// Open a turn span (attrs: conversationId, turnId, model)
let turnSpan: Span | undefined;
@@ -444,6 +487,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
turnSpan,
toolSpans,
cwd: input.cwd,
+ now,
});
totalUsage = addUsage(totalUsage, stepResult.usage);
@@ -499,7 +543,22 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
}
}
- input.emit(doneEvent(conversationId, turnId, finishReason));
+ const turnDurationMs =
+ turnStartMs !== undefined && now !== undefined ? now() - turnStartMs : undefined;
+ const hasUsage =
+ totalUsage.inputTokens > 0 ||
+ totalUsage.outputTokens > 0 ||
+ totalUsage.cacheReadTokens !== undefined ||
+ totalUsage.cacheWriteTokens !== undefined;
+ input.emit(
+ doneEvent(
+ conversationId,
+ turnId,
+ finishReason,
+ turnDurationMs,
+ hasUsage ? totalUsage : undefined,
+ ),
+ );
return { messages: resultMessages, usage: totalUsage, finishReason };
}