diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/metrics/format.test.ts | 199 | ||||
| -rw-r--r-- | src/core/metrics/format.ts | 69 | ||||
| -rw-r--r-- | src/core/metrics/index.ts | 17 | ||||
| -rw-r--r-- | src/core/metrics/place.test.ts | 469 | ||||
| -rw-r--r-- | src/core/metrics/place.ts | 151 | ||||
| -rw-r--r-- | src/core/metrics/reducer.test.ts | 368 | ||||
| -rw-r--r-- | src/core/metrics/reducer.ts | 239 | ||||
| -rw-r--r-- | src/core/metrics/types.ts | 68 |
8 files changed, 1580 insertions, 0 deletions
diff --git a/src/core/metrics/format.test.ts b/src/core/metrics/format.test.ts new file mode 100644 index 0000000..9881e50 --- /dev/null +++ b/src/core/metrics/format.test.ts @@ -0,0 +1,199 @@ +import type { StepId, StepMetrics, TurnMetrics } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { computeTps, viewStepMetrics, viewTurnMetrics } from "./format"; + +describe("computeTps", () => { + it("null when elapsed missing", () => { + expect(computeTps(100, undefined)).toBeNull(); + }); + + it("null when elapsed is zero", () => { + expect(computeTps(100, 0)).toBeNull(); + }); + + it("null when elapsed is negative", () => { + expect(computeTps(100, -100)).toBeNull(); + }); + + it("computes tokens per second", () => { + expect(computeTps(1000, 2000)).toBe(500); + }); + + it("computes fractional tps", () => { + expect(computeTps(100, 3000)).toBeCloseTo(33.33, 1); + }); +}); + +describe("viewStepMetrics", () => { + it("formats tokens with thousands separator, tps, and durations", () => { + const step: StepMetrics = { + stepId: "s1" as StepId, + usage: { inputTokens: 1234, outputTokens: 567 }, + ttftMs: 820, + decodeMs: 1200, + genTotalMs: 2020, + }; + const view = viewStepMetrics(step, 0); + expect(view.label).toBe("step 1"); + expect(view.tokensLabel).toBe("1,801 tok"); + expect(view.tps).toBe("473 tok/s"); + expect(view.ttft).toBe("820ms"); + expect(view.decode).toBe("1.2s"); + expect(view.genTotal).toBe("2.0s"); + }); + + it("handles missing timing fields", () => { + const step: StepMetrics = { + stepId: "s1" as StepId, + usage: { inputTokens: 100, outputTokens: 50 }, + }; + const view = viewStepMetrics(step, 0); + expect(view.tps).toBeNull(); + expect(view.ttft).toBeNull(); + expect(view.decode).toBeNull(); + expect(view.genTotal).toBeNull(); + }); + + it("formats duration < 1s as ms", () => { + const step: StepMetrics = { + stepId: "s1" as StepId, + usage: { inputTokens: 10, outputTokens: 5 }, + ttftMs: 42, + }; + const view = viewStepMetrics(step, 0); + expect(view.ttft).toBe("42ms"); + }); + + it("formats duration >= 1s as seconds", () => { + const step: StepMetrics = { + stepId: "s1" as StepId, + usage: { inputTokens: 10, outputTokens: 5 }, + genTotalMs: 3200, + }; + const view = viewStepMetrics(step, 0); + expect(view.genTotal).toBe("3.2s"); + }); + + it("uses step index for label", () => { + const step: StepMetrics = { + stepId: "s1" as StepId, + usage: { inputTokens: 10, outputTokens: 5 }, + }; + expect(viewStepMetrics(step, 2).label).toBe("step 3"); + }); + + it("tps uses decodeMs (not genTotalMs)", () => { + const step: StepMetrics = { + stepId: "s1" as StepId, + usage: { inputTokens: 100, outputTokens: 50 }, + decodeMs: 500, + genTotalMs: 800, + }; + const view = viewStepMetrics(step, 0); + // 50 / (500/1000) = 100 tok/s, NOT 50/(800/1000)=62.5 + expect(view.tps).toBe("100 tok/s"); + }); + + it("tps falls back to genTotalMs when decodeMs absent", () => { + const step: StepMetrics = { + stepId: "s1" as StepId, + usage: { inputTokens: 100, outputTokens: 50 }, + genTotalMs: 800, + }; + const view = viewStepMetrics(step, 0); + // 50 / (800/1000) = 62.5 → rounds to 63 + expect(view.tps).toBe("63 tok/s"); + }); +}); + +describe("viewTurnMetrics", () => { + it("formats total tokens and breakdown", () => { + const turn: TurnMetrics = { + turnId: "t1", + usage: { inputTokens: 1000, outputTokens: 234 }, + durationMs: 5000, + steps: [ + { + stepId: "s1" as StepId, + usage: { inputTokens: 1000, outputTokens: 234 }, + decodeMs: 3000, + genTotalMs: 4000, + }, + ], + }; + const view = viewTurnMetrics(turn); + expect(view.tokensLabel).toBe("1,234 tok"); + expect(view.breakdown).toBe("1,000 in / 234 out"); + expect(view.tps).toBe("78 tok/s"); + expect(view.duration).toBe("5.0s"); + }); + + it("breakdown includes cache only when present", () => { + const turn: TurnMetrics = { + turnId: "t1", + usage: { inputTokens: 1000, outputTokens: 234, cacheReadTokens: 500 }, + steps: [], + }; + const view = viewTurnMetrics(turn); + expect(view.breakdown).toBe("1,000 in / 234 out / 500 cache"); + }); + + it("breakdown omits cache when not present", () => { + const turn: TurnMetrics = { + turnId: "t1", + usage: { inputTokens: 100, outputTokens: 50 }, + steps: [], + }; + const view = viewTurnMetrics(turn); + expect(view.breakdown).toBe("100 in / 50 out"); + }); + + it("tps is null when no step has decodeMs or genTotalMs", () => { + const turn: TurnMetrics = { + turnId: "t1", + usage: { inputTokens: 100, outputTokens: 50 }, + steps: [ + { + stepId: "s1" as StepId, + usage: { inputTokens: 100, outputTokens: 50 }, + }, + ], + }; + const view = viewTurnMetrics(turn); + expect(view.tps).toBeNull(); + }); + + it("duration is null when durationMs absent", () => { + const turn: TurnMetrics = { + turnId: "t1", + usage: { inputTokens: 100, outputTokens: 50 }, + steps: [], + }; + const view = viewTurnMetrics(turn); + expect(view.duration).toBeNull(); + }); + + it("sums decodeMs across steps (fallback genTotalMs per step) for tps", () => { + const turn: TurnMetrics = { + turnId: "t1", + usage: { inputTokens: 300, outputTokens: 150 }, + steps: [ + { + stepId: "s1" as StepId, + usage: { inputTokens: 100, outputTokens: 50 }, + decodeMs: 800, + genTotalMs: 1000, + }, + { + stepId: "s2" as StepId, + usage: { inputTokens: 200, outputTokens: 100 }, + genTotalMs: 2000, + }, + ], + }; + const view = viewTurnMetrics(turn); + // step1 uses decodeMs=800, step2 falls back to genTotalMs=2000 → total=2800ms + // 150 / (2800/1000) = 53.57 → rounds to 54 + expect(view.tps).toBe("54 tok/s"); + }); +}); diff --git a/src/core/metrics/format.ts b/src/core/metrics/format.ts new file mode 100644 index 0000000..3a4078c --- /dev/null +++ b/src/core/metrics/format.ts @@ -0,0 +1,69 @@ +import type { StepMetrics, TurnMetrics, Usage } from "@dispatch/wire"; +import type { StepMetricsView, TurnMetricsView } from "./types"; + +function formatTokens(n: number): string { + return n.toLocaleString("en-US"); +} + +function formatDuration(ms: number | undefined): string | null { + if (ms === undefined || ms <= 0) return null; + if (ms < 1000) return `${Math.round(ms)}ms`; + return `${(ms / 1000).toFixed(1)}s`; +} + +function formatTps(tps: number | null): string | null { + if (tps === null) return null; + if (tps < 10) return `${tps.toFixed(1)} tok/s`; + return `${Math.round(tps)} tok/s`; +} + +/** Compute tokens-per-second. Returns null when elapsed time is absent or zero. */ +export function computeTps(outputTokens: number, elapsedMs: number | undefined): number | null { + if (elapsedMs === undefined || elapsedMs <= 0) return null; + return outputTokens / (elapsedMs / 1000); +} + +function totalTokens(u: Usage): number { + return u.inputTokens + u.outputTokens; +} + +function formatBreakdown(u: Usage): string { + let s = `${formatTokens(u.inputTokens)} in / ${formatTokens(u.outputTokens)} out`; + if (u.cacheReadTokens !== undefined && u.cacheReadTokens > 0) { + s += ` / ${formatTokens(u.cacheReadTokens)} cache`; + } + return s; +} + +/** Build a formatted view of a single step's metrics. */ +export function viewStepMetrics(step: StepMetrics, index: number): StepMetricsView { + const total = totalTokens(step.usage); + const tps = computeTps(step.usage.outputTokens, step.decodeMs ?? step.genTotalMs); + return { + label: `step ${index + 1}`, + tokensLabel: `${formatTokens(total)} tok`, + tps: formatTps(tps), + ttft: formatDuration(step.ttftMs), + decode: formatDuration(step.decodeMs), + genTotal: formatDuration(step.genTotalMs), + }; +} + +/** Build a formatted view of a turn's aggregate metrics. */ +export function viewTurnMetrics(turn: TurnMetrics): TurnMetricsView { + const total = totalTokens(turn.usage); + let totalGenMs: number | undefined; + for (const step of turn.steps) { + const stepMs = step.decodeMs ?? step.genTotalMs; + if (stepMs !== undefined) { + totalGenMs = (totalGenMs ?? 0) + stepMs; + } + } + const tps = computeTps(turn.usage.outputTokens, totalGenMs); + return { + tokensLabel: `${formatTokens(total)} tok`, + breakdown: formatBreakdown(turn.usage), + tps: formatTps(tps), + duration: formatDuration(turn.durationMs), + }; +} diff --git a/src/core/metrics/index.ts b/src/core/metrics/index.ts new file mode 100644 index 0000000..72d825d --- /dev/null +++ b/src/core/metrics/index.ts @@ -0,0 +1,17 @@ +export { computeTps, viewStepMetrics, viewTurnMetrics } from "./format"; +export { interleaveTurnMetrics } from "./place"; +export { + applyDurableMetrics, + foldMetricsEvent, + initialMetricsState, + selectOrderedTurnMetrics, +} from "./reducer"; +export type { + MetricsRow, + MetricsState, + StepMetrics, + StepMetricsView, + TurnMetrics, + TurnMetricsEntry, + TurnMetricsView, +} from "./types"; diff --git a/src/core/metrics/place.test.ts b/src/core/metrics/place.test.ts new file mode 100644 index 0000000..b6cb877 --- /dev/null +++ b/src/core/metrics/place.test.ts @@ -0,0 +1,469 @@ +import type { StepId, StepMetrics, TurnMetrics } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import type { RenderGroup } from "../chunks"; +import { interleaveTurnMetrics } from "./place"; +import type { TurnMetricsEntry } from "./types"; + +function userGroup(seq: number, text: string): RenderGroup { + return { + kind: "single", + chunk: { + seq, + role: "user", + chunk: { type: "text", text }, + provisional: false, + }, + }; +} + +function assistantGroup(seq: number, text: string): RenderGroup { + return { + kind: "single", + chunk: { + seq, + role: "assistant", + chunk: { type: "text", text }, + provisional: false, + }, + }; +} + +function toolCallGroup(seq: number, stepId: string, toolCallId: string): RenderGroup { + return { + kind: "single", + chunk: { + seq, + role: "assistant", + chunk: { + type: "tool-call", + toolCallId, + toolName: "test", + input: {}, + stepId: stepId as StepId, + }, + provisional: false, + }, + }; +} + +function toolResultGroup(seq: number, stepId: string, toolCallId: string): RenderGroup { + return { + kind: "single", + chunk: { + seq, + role: "tool", + chunk: { + type: "tool-result", + toolCallId, + toolName: "test", + content: "", + isError: false, + stepId: stepId as StepId, + }, + provisional: false, + }, + }; +} + +function toolBatchGroup(stepId: string, toolCallIds: string[]): RenderGroup { + return { + kind: "tool-batch", + stepId, + entries: toolCallIds.map((id) => ({ + call: { + type: "tool-call" as const, + toolCallId: id, + toolName: "test", + input: {}, + stepId: stepId as StepId, + }, + result: null, + })), + provisional: false, + }; +} + +function makeStep(stepId: string, inputTokens: number, outputTokens: number): StepMetrics { + return { + stepId: stepId as StepId, + usage: { inputTokens, outputTokens }, + }; +} + +function makeTurn( + turnId: string, + inputTokens: number, + outputTokens: number, + steps: StepMetrics[] = [], +): TurnMetrics { + return { + turnId, + usage: { inputTokens, outputTokens }, + steps, + }; +} + +function makeEntry( + turnId: string, + inputTokens: number, + outputTokens: number, + steps: StepMetrics[] = [], +): TurnMetricsEntry { + return { + turnId, + steps, + total: makeTurn(turnId, inputTokens, outputTokens, steps), + }; +} + +function makeProgressiveEntry(turnId: string, steps: StepMetrics[]): TurnMetricsEntry { + return { + turnId, + steps, + total: null, + }; +} + +function expectGroupAt( + rows: readonly { readonly kind: string }[], + index: number, + expected: RenderGroup, +): void { + const row = rows[index]; + expect(row?.kind).toBe("group"); + expect((row as { readonly group: RenderGroup } | undefined)?.group).toBe(expected); +} + +function expectStepMetricsAt( + rows: readonly { readonly kind: string }[], + index: number, + expectedStepId: string, + expectedIndex: number, +): void { + const row = rows[index]; + expect(row?.kind).toBe("step-metrics"); + const sm = row as { readonly step: StepMetrics; readonly index: number } | undefined; + expect(sm?.step.stepId).toBe(expectedStepId); + expect(sm?.index).toBe(expectedIndex); +} + +function expectTurnMetricsAt( + rows: readonly { readonly kind: string }[], + index: number, + expectedTurnId: string, +): void { + const row = rows[index]; + expect(row?.kind).toBe("turn-metrics"); + expect((row as { readonly turn: TurnMetrics } | undefined)?.turn.turnId).toBe(expectedTurnId); +} + +describe("interleaveTurnMetrics", () => { + it("no metrics: rows are all groups, unchanged order", () => { + const g1 = userGroup(1, "q"); + const g2 = assistantGroup(2, "a"); + const rows = interleaveTurnMetrics([g1, g2], []); + expect(rows).toHaveLength(2); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + }); + + it("head-aligned: segment i gets entries[i]", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const g3 = userGroup(3, "q2"); + const g4 = assistantGroup(4, "a2"); + const step1 = makeStep("s1", 100, 50); + const step2 = makeStep("s2", 200, 80); + const rows = interleaveTurnMetrics( + [g1, g2, g3, g4], + [makeEntry("t1", 100, 50, [step1]), makeEntry("t2", 200, 80, [step2])], + ); + + expect(rows).toHaveLength(8); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectTurnMetricsAt(rows, 3, "t1"); + expectGroupAt(rows, 4, g3); + expectGroupAt(rows, 5, g4); + expectStepMetricsAt(rows, 6, "s2", 0); + expectTurnMetricsAt(rows, 7, "t2"); + }); + + it("a trailing segment with no entry (in-flight turn) renders no metrics", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const g3 = userGroup(3, "q2"); + const g4 = assistantGroup(4, "a2"); + const step = makeStep("s1", 100, 50); + const rows = interleaveTurnMetrics([g1, g2, g3, g4], [makeEntry("t1", 100, 50, [step])]); + + expect(rows).toHaveLength(6); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectTurnMetricsAt(rows, 3, "t1"); + expectGroupAt(rows, 4, g3); + expectGroupAt(rows, 5, g4); + }); + + it("single text-only turn: step row + turn-metrics both at tail", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const step = makeStep("s1", 100, 50); + const turn = makeEntry("t1", 100, 50, [step]); + const rows = interleaveTurnMetrics([g1, g2], [turn]); + + expect(rows).toHaveLength(4); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectTurnMetricsAt(rows, 3, "t1"); + }); + + it("tool step anchors inline after its tool-batch group", () => { + const g1 = userGroup(1, "q1"); + const g2 = toolBatchGroup("t#0", ["c1", "c2"]); + const g3 = assistantGroup(3, "a1"); + const step0 = makeStep("t#0", 100, 50); + const step1 = makeStep("t#1", 200, 80); + const turn = makeEntry("t1", 300, 130, [step0, step1]); + const rows = interleaveTurnMetrics([g1, g2, g3], [turn]); + + expect(rows).toHaveLength(6); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "t#0", 0); + expectGroupAt(rows, 3, g3); + expectStepMetricsAt(rows, 4, "t#1", 1); + expectTurnMetricsAt(rows, 5, "t1"); + }); + + it("single tool-call group anchors its step", () => { + const g1 = userGroup(1, "q1"); + const g2 = toolCallGroup(2, "s1", "c1"); + const g3 = assistantGroup(3, "a1"); + const step = makeStep("s1", 100, 50); + const turn = makeEntry("t1", 100, 50, [step]); + const rows = interleaveTurnMetrics([g1, g2, g3], [turn]); + + expect(rows).toHaveLength(5); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectGroupAt(rows, 3, g3); + expectTurnMetricsAt(rows, 4, "t1"); + }); + + it("single tool-result group anchors its step", () => { + const g1 = userGroup(1, "q1"); + const g2 = toolResultGroup(2, "s1", "c1"); + const g3 = assistantGroup(3, "a1"); + const step = makeStep("s1", 100, 50); + const turn = makeEntry("t1", 100, 50, [step]); + const rows = interleaveTurnMetrics([g1, g2, g3], [turn]); + + expect(rows).toHaveLength(5); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectGroupAt(rows, 3, g3); + expectTurnMetricsAt(rows, 4, "t1"); + }); + + it("multi-step: each tool step inline, final step + total at tail", () => { + const g1 = userGroup(1, "q1"); + const g2 = toolBatchGroup("t#0", ["c1"]); + const g3 = assistantGroup(2, "thinking"); + const g4 = toolBatchGroup("t#1", ["c2", "c3"]); + const g5 = assistantGroup(3, "a1"); + const step0 = makeStep("t#0", 100, 50); + const step1 = makeStep("t#1", 200, 80); + const step2 = makeStep("t#2", 50, 20); + const turn = makeEntry("t1", 350, 150, [step0, step1, step2]); + const rows = interleaveTurnMetrics([g1, g2, g3, g4, g5], [turn]); + + expect(rows).toHaveLength(9); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "t#0", 0); + expectGroupAt(rows, 3, g3); + expectGroupAt(rows, 4, g4); + expectStepMetricsAt(rows, 5, "t#1", 1); + expectGroupAt(rows, 6, g5); + expectStepMetricsAt(rows, 7, "t#2", 2); + expectTurnMetricsAt(rows, 8, "t1"); + }); + + it("multiple turns head-aligned with inline steps", () => { + const g1 = userGroup(1, "q1"); + const g2 = toolBatchGroup("s1", ["c1"]); + const g3 = assistantGroup(2, "a1"); + const g4 = userGroup(3, "q2"); + const g5 = assistantGroup(4, "a2"); + const step1 = makeStep("s1", 100, 50); + const step2 = makeStep("s2", 200, 80); + const rows = interleaveTurnMetrics( + [g1, g2, g3, g4, g5], + [makeEntry("t1", 100, 50, [step1]), makeEntry("t2", 200, 80, [step2])], + ); + + expect(rows).toHaveLength(9); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectGroupAt(rows, 3, g3); + expectTurnMetricsAt(rows, 4, "t1"); + expectGroupAt(rows, 5, g4); + expectGroupAt(rows, 6, g5); + expectStepMetricsAt(rows, 7, "s2", 0); + expectTurnMetricsAt(rows, 8, "t2"); + }); + + it("unanchored step (stepId not in groups) falls back to tail before turn-metrics", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const step0 = makeStep("orphan", 100, 50); + const turn = makeEntry("t1", 100, 50, [step0]); + const rows = interleaveTurnMetrics([g1, g2], [turn]); + + expect(rows).toHaveLength(4); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "orphan", 0); + expectTurnMetricsAt(rows, 3, "t1"); + }); + + it("fewer metrics than segments: trailing segments are bare", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const g3 = userGroup(3, "q2"); + const g4 = assistantGroup(4, "a2"); + const g5 = userGroup(5, "q3"); + const g6 = assistantGroup(6, "a3"); + const step = makeStep("s1", 300, 120); + const rows = interleaveTurnMetrics( + [g1, g2, g3, g4, g5, g6], + [makeEntry("t1", 300, 120, [step])], + ); + + expect(rows).toHaveLength(8); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectTurnMetricsAt(rows, 3, "t1"); + expectGroupAt(rows, 4, g3); + expectGroupAt(rows, 5, g4); + expectGroupAt(rows, 6, g5); + expectGroupAt(rows, 7, g6); + }); + + it("in-flight turn (no durationMs) still produces step + turn rows", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const step = makeStep("s1", 100, 50); + const turn: TurnMetricsEntry = { + turnId: "t1", + steps: [step], + total: { + turnId: "t1", + usage: { inputTokens: 100, outputTokens: 50 }, + steps: [step], + }, + }; + const rows = interleaveTurnMetrics([g1, g2], [turn]); + + expect(rows).toHaveLength(4); + expectStepMetricsAt(rows, 2, "s1", 0); + expectTurnMetricsAt(rows, 3, "t1"); + const metricsRow = rows[3] as { readonly turn: TurnMetrics } | undefined; + expect(metricsRow?.turn.durationMs).toBeUndefined(); + }); + + it("leading non-turn groups emit as plain group rows", () => { + const g0 = assistantGroup(1, "system msg"); + const g1 = userGroup(2, "q1"); + const g2 = assistantGroup(3, "a1"); + const step = makeStep("s1", 100, 50); + const rows = interleaveTurnMetrics([g0, g1, g2], [makeEntry("t1", 100, 50, [step])]); + + expect(rows).toHaveLength(5); + expectGroupAt(rows, 0, g0); + expect(rows[1]?.kind).toBe("group"); + expect(rows[2]?.kind).toBe("group"); + expectStepMetricsAt(rows, 3, "s1", 0); + expectTurnMetricsAt(rows, 4, "t1"); + }); + + it("more metrics than segments: only T entries placed (extra ignored)", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const step1 = makeStep("s1", 100, 50); + const step2 = makeStep("s2", 200, 80); + const rows = interleaveTurnMetrics( + [g1, g2], + [makeEntry("t1", 100, 50, [step1]), makeEntry("t2", 200, 80, [step2])], + ); + + expect(rows).toHaveLength(4); + expectStepMetricsAt(rows, 2, "s1", 0); + expectTurnMetricsAt(rows, 3, "t1"); + }); + + it("turn with no steps emits only turn-metrics (no step-metrics)", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const rows = interleaveTurnMetrics([g1, g2], [makeEntry("t1", 100, 50)]); + + expect(rows).toHaveLength(3); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectTurnMetricsAt(rows, 2, "t1"); + }); + + it("progressive: entry with steps but total=null emits step rows and NO turn-metrics row", () => { + const g1 = userGroup(1, "q1"); + const g2 = toolBatchGroup("s1", ["c1"]); + const g3 = assistantGroup(2, "a1"); + const step1 = makeStep("s1", 100, 50); + const entry = makeProgressiveEntry("t1", [step1]); + const rows = interleaveTurnMetrics([g1, g2, g3], [entry]); + + expect(rows).toHaveLength(4); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectGroupAt(rows, 3, g3); + }); + + it("entry with total emits step rows + a turn-metrics row", () => { + const g1 = userGroup(1, "q1"); + const g2 = toolBatchGroup("s1", ["c1"]); + const g3 = assistantGroup(2, "a1"); + const step1 = makeStep("s1", 100, 50); + const entry = makeEntry("t1", 100, 50, [step1]); + const rows = interleaveTurnMetrics([g1, g2, g3], [entry]); + + expect(rows).toHaveLength(5); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectGroupAt(rows, 3, g3); + expectTurnMetricsAt(rows, 4, "t1"); + }); + + it("progressive multi-step: unanchored steps at tail, no turn-metrics", () => { + const g1 = userGroup(1, "q1"); + const g2 = assistantGroup(2, "a1"); + const step0 = makeStep("s1", 100, 50); + const step1 = makeStep("s2", 200, 80); + const entry = makeProgressiveEntry("t1", [step0, step1]); + const rows = interleaveTurnMetrics([g1, g2], [entry]); + + expect(rows).toHaveLength(4); + expectGroupAt(rows, 0, g1); + expectGroupAt(rows, 1, g2); + expectStepMetricsAt(rows, 2, "s1", 0); + expectStepMetricsAt(rows, 3, "s2", 1); + }); +}); diff --git a/src/core/metrics/place.ts b/src/core/metrics/place.ts new file mode 100644 index 0000000..2481a16 --- /dev/null +++ b/src/core/metrics/place.ts @@ -0,0 +1,151 @@ +import type { RenderGroup } from "../chunks"; +import type { MetricsRow, TurnMetricsEntry } from "./types"; + +function groupStepId(g: RenderGroup): string | undefined { + if (g.kind === "tool-batch") return g.stepId; + const c = g.chunk.chunk; + return c.type === "tool-call" || c.type === "tool-result" ? c.stepId : undefined; +} + +/** + * Interleave turn metrics into the rendered transcript. + * + * Splits groups into per-turn segments: a new segment begins at each `single` + * group with `group.chunk.role === "user"`. Head-aligns: segment `i` receives + * `entries[i]` (the first `min(K, T)` segments get the first `min(K, T)` entries). + * + * Within a segment that has an aligned turn entry, each completed step's metrics + * are placed INLINE right after the last group bearing that step's `stepId` (tool-call/ + * tool-result chunks and tool-batch groups carry `stepId`). Steps whose `stepId` does + * not appear in any group ("unanchored") fall back to the segment tail, before the + * turn-metrics row (if present). + * + * A `turn-metrics` row is emitted ONLY when `entry.total !== null` (i.e. the turn + * is finalized via `done` or durable data). A still-generating turn emits its + * completed step rows but NO turn-total row. + * + * Head-alignment is stable: the durable `/metrics` endpoint returns every + * SEALED turn in turn order (a contiguous prefix from turn 0), and we append + * only the just-finished live turn — so `entries[i]` is turn `i`, and existing + * turns never move when a new turn is appended. + */ +export function interleaveTurnMetrics( + groups: readonly RenderGroup[], + entries: readonly TurnMetricsEntry[], +): readonly MetricsRow[] { + if (entries.length === 0) { + return groups.map((g) => ({ kind: "group" as const, group: g })); + } + + const segmentStarts: number[] = []; + for (let i = 0; i < groups.length; i++) { + const g = groups[i]; + if (g !== undefined && g.kind === "single" && g.chunk.role === "user") { + segmentStarts.push(i); + } + } + + const T = segmentStarts.length; + + if (T === 0) { + return groups.map((g) => ({ kind: "group" as const, group: g })); + } + + const K = entries.length; + const matched = Math.min(K, T); + + // Head-alignment: segment i ↔ entries[i] for i in [0, matched). + // A trailing segment with no corresponding entry renders no metrics. + const segmentEntries = new Map<number, TurnMetricsEntry>(); + for (let i = 0; i < matched; i++) { + const entry = entries[i]; + if (entry !== undefined) { + segmentEntries.set(i, entry); + } + } + + const rows: MetricsRow[] = []; + + const firstUserIdx = segmentStarts[0] ?? 0; + for (let i = 0; i < firstUserIdx; i++) { + const g = groups[i]; + if (g !== undefined) { + rows.push({ kind: "group", group: g }); + } + } + + for (let seg = 0; seg < T; seg++) { + const start = segmentStarts[seg] ?? 0; + const end = seg + 1 < T ? (segmentStarts[seg + 1] ?? groups.length) : groups.length; + + const entry = segmentEntries.get(seg); + + if (entry === undefined) { + for (let i = start; i < end; i++) { + const g = groups[i]; + if (g !== undefined) { + rows.push({ kind: "group", group: g }); + } + } + continue; + } + + // Build anchor map: for each stepId, the LAST group index in this segment. + const anchorByStepId = new Map<string, number>(); + for (let i = start; i < end; i++) { + const g = groups[i]; + if (g === undefined) continue; + const sid = groupStepId(g); + if (sid !== undefined) { + anchorByStepId.set(sid, i); + } + } + + // Classify each step as anchored (at a group index) or unanchored. + const anchored: Map<number, { stepIndex: number; step: (typeof entry.steps)[number] }[]> = + new Map(); + const unanchored: { stepIndex: number; step: (typeof entry.steps)[number] }[] = []; + + for (let i = 0; i < entry.steps.length; i++) { + const step = entry.steps[i]; + if (step === undefined) continue; + const anchorGroupIdx = anchorByStepId.get(step.stepId); + if (anchorGroupIdx !== undefined) { + let arr = anchored.get(anchorGroupIdx); + if (arr === undefined) { + arr = []; + anchored.set(anchorGroupIdx, arr); + } + arr.push({ stepIndex: i, step }); + } else { + unanchored.push({ stepIndex: i, step }); + } + } + + // Emit groups; after each anchored group, emit its step-metrics rows. + for (let i = start; i < end; i++) { + const g = groups[i]; + if (g !== undefined) { + rows.push({ kind: "group", group: g }); + } + const stepsHere = anchored.get(i); + if (stepsHere !== undefined) { + stepsHere.sort((a, b) => a.stepIndex - b.stepIndex); + for (const { step, stepIndex } of stepsHere) { + rows.push({ kind: "step-metrics", step, index: stepIndex }); + } + } + } + + // Segment tail: unanchored steps, then turn-metrics (only when total is present). + unanchored.sort((a, b) => a.stepIndex - b.stepIndex); + for (const { step, stepIndex } of unanchored) { + rows.push({ kind: "step-metrics", step, index: stepIndex }); + } + if (entry.total !== null) { + rows.push({ kind: "turn-metrics", turn: entry.total }); + } + } + + return rows; +} diff --git a/src/core/metrics/reducer.test.ts b/src/core/metrics/reducer.test.ts new file mode 100644 index 0000000..16c88b3 --- /dev/null +++ b/src/core/metrics/reducer.test.ts @@ -0,0 +1,368 @@ +import type { StepId, TurnDoneEvent, TurnStepCompleteEvent, TurnUsageEvent } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { + applyDurableMetrics, + foldMetricsEvent, + initialMetricsState, + selectOrderedTurnMetrics, +} from "./reducer"; + +const usageEvent = ( + turnId: string, + inputTokens: number, + outputTokens: number, + stepId?: string, +): TurnUsageEvent => { + const base = { + type: "usage" as const, + conversationId: "c1", + turnId, + usage: { inputTokens, outputTokens }, + }; + if (stepId !== undefined) { + return { ...base, stepId: stepId as StepId }; + } + return base; +}; + +const stepCompleteEvent = ( + turnId: string, + stepId: string, + timing: { ttftMs?: number; decodeMs?: number; genTotalMs?: number } = {}, +): TurnStepCompleteEvent => ({ + type: "step-complete", + conversationId: "c1", + turnId, + stepId: stepId as StepId, + ...timing, +}); + +const doneEvent = ( + turnId: string, + extra: { durationMs?: number; usage?: { inputTokens: number; outputTokens: number } } = {}, +): TurnDoneEvent => ({ + type: "done", + conversationId: "c1", + turnId, + reason: "stop", + ...extra, +}); + +describe("initialMetricsState", () => { + it("starts empty", () => { + const s = initialMetricsState(); + expect(s.live.size).toBe(0); + expect(s.liveOrder).toEqual([]); + expect(s.durable.size).toBe(0); + expect(s.durableOrder).toEqual([]); + }); +}); + +describe("foldMetricsEvent", () => { + it("folds per-step usage by stepId into a turn", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1")); + s = foldMetricsEvent(s, usageEvent("t1", 200, 80, "s2")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s2")); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(1); + expect(ordered[0]?.turnId).toBe("t1"); + expect(ordered[0]?.steps).toHaveLength(2); + expect(ordered[0]?.steps[0]?.stepId).toBe("s1"); + expect(ordered[0]?.steps[0]?.usage).toEqual({ inputTokens: 100, outputTokens: 50 }); + expect(ordered[0]?.steps[1]?.stepId).toBe("s2"); + expect(ordered[0]?.steps[1]?.usage).toEqual({ inputTokens: 200, outputTokens: 80 }); + }); + + it("folds step-complete timing and merges with same-step usage", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent( + s, + stepCompleteEvent("t1", "s1", { ttftMs: 200, decodeMs: 800, genTotalMs: 1000 }), + ); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(1); + const step = ordered[0]?.steps[0]; + expect(step?.usage).toEqual({ inputTokens: 100, outputTokens: 50 }); + expect(step?.ttftMs).toBe(200); + expect(step?.decodeMs).toBe(800); + expect(step?.genTotalMs).toBe(1000); + }); + + it("step-complete before usage defaults usage to zeros", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1", { genTotalMs: 500 })); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + const step = ordered[0]?.steps[0]; + expect(step?.usage).toEqual({ inputTokens: 0, outputTokens: 0 }); + expect(step?.genTotalMs).toBe(500); + }); + + it("done sets durationMs and aggregate usage", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1")); + s = foldMetricsEvent( + s, + doneEvent("t1", { + durationMs: 5000, + usage: { inputTokens: 300, outputTokens: 150 }, + }), + ); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered[0]?.total?.durationMs).toBe(5000); + expect(ordered[0]?.total?.usage).toEqual({ inputTokens: 300, outputTokens: 150 }); + }); + + it("aggregate usage sums steps when done.usage absent", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1")); + s = foldMetricsEvent(s, usageEvent("t1", 200, 80, "s2")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s2")); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered[0]?.total?.usage).toEqual({ inputTokens: 300, outputTokens: 130 }); + }); + + it("aggregate usage includes cache only when a step had cache", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, { + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: "s1" as StepId, + usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 30 }, + }); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1")); + s = foldMetricsEvent(s, usageEvent("t1", 200, 80, "s2")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s2")); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered[0]?.total?.usage.cacheReadTokens).toBe(30); + expect(ordered[0]?.total?.usage.cacheWriteTokens).toBeUndefined(); + }); + + it("tolerates missing clock (no genTotalMs/ttft/decode)", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1")); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + const step = ordered[0]?.steps[0]; + expect(step?.ttftMs).toBeUndefined(); + expect(step?.decodeMs).toBeUndefined(); + expect(step?.genTotalMs).toBeUndefined(); + expect(ordered[0]?.total?.durationMs).toBeUndefined(); + }); + + it("usage without stepId does not create a turn", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50)); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(0); + }); + + it("ignores non-metrics events", () => { + const s = initialMetricsState(); + const next = foldMetricsEvent(s, { + type: "status", + conversationId: "c1", + status: "running", + }); + expect(next).toBe(s); + }); + + it("preserves first-seen order of steps", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 10, 5, "s2")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s2")); + s = foldMetricsEvent(s, usageEvent("t1", 20, 8, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1")); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered[0]?.steps[0]?.stepId).toBe("s2"); + expect(ordered[0]?.steps[1]?.stepId).toBe("s1"); + }); + + it("preserves first-seen order of turns", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t2", 10, 5, "s1")); + s = foldMetricsEvent(s, usageEvent("t1", 20, 8, "s1")); + s = foldMetricsEvent(s, doneEvent("t2")); + s = foldMetricsEvent(s, doneEvent("t1")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered[0]?.turnId).toBe("t2"); + expect(ordered[1]?.turnId).toBe("t1"); + }); +}); + +describe("selectOrderedTurnMetrics", () => { + it("durable wins over live by turnId, live-done appended last", () => { + let s = initialMetricsState(); + + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, usageEvent("t2", 200, 80, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t2", "s1")); + s = foldMetricsEvent(s, doneEvent("t2")); + + s = applyDurableMetrics(s, [ + { + turnId: "t1", + usage: { inputTokens: 999, outputTokens: 999 }, + durationMs: 3000, + steps: [ + { + stepId: "s1" as StepId, + usage: { inputTokens: 999, outputTokens: 999 }, + genTotalMs: 3000, + }, + ], + }, + ]); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(2); + expect(ordered[0]?.turnId).toBe("t1"); + expect(ordered[0]?.total?.usage.inputTokens).toBe(999); + expect(ordered[0]?.total?.durationMs).toBe(3000); + expect(ordered[1]?.turnId).toBe("t2"); + expect(ordered[1]?.total?.durationMs).toBeUndefined(); + }); + + it("empty state returns empty", () => { + const s = initialMetricsState(); + expect(selectOrderedTurnMetrics(s)).toEqual([]); + }); + + it("selectOrderedTurnMetrics: in-flight turn exposes only completed steps and total=null", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1", { genTotalMs: 1000 })); + s = foldMetricsEvent(s, usageEvent("t1", 200, 80, "s2")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(1); + expect(ordered[0]?.turnId).toBe("t1"); + expect(ordered[0]?.steps).toHaveLength(1); + expect(ordered[0]?.steps[0]?.stepId).toBe("s1"); + expect(ordered[0]?.total).toBeNull(); + }); + + it("selectOrderedTurnMetrics: a turn with no complete step and not done is omitted", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, usageEvent("t1", 200, 80, "s2")); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(0); + }); + + it("selectOrderedTurnMetrics: after done, total is present", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1", { genTotalMs: 1000 })); + s = foldMetricsEvent(s, doneEvent("t1", { durationMs: 2000 })); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(1); + expect(ordered[0]?.turnId).toBe("t1"); + expect(ordered[0]?.total?.durationMs).toBe(2000); + expect(ordered[0]?.steps).toHaveLength(1); + }); + + it("step-complete marks the step complete", () => { + let s = initialMetricsState(); + s = foldMetricsEvent(s, usageEvent("t1", 100, 50, "s1")); + s = foldMetricsEvent(s, stepCompleteEvent("t1", "s1", { genTotalMs: 500 })); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(1); + expect(ordered[0]?.steps).toHaveLength(1); + expect(ordered[0]?.steps[0]?.stepId).toBe("s1"); + expect(ordered[0]?.steps[0]?.genTotalMs).toBe(500); + }); + + it("selectOrderedTurnMetrics: durable turn → steps + total present", () => { + let s = initialMetricsState(); + s = applyDurableMetrics(s, [ + { + turnId: "t1", + usage: { inputTokens: 300, outputTokens: 150 }, + durationMs: 5000, + steps: [ + { + stepId: "s1" as StepId, + usage: { inputTokens: 100, outputTokens: 50 }, + genTotalMs: 1000, + }, + { + stepId: "s2" as StepId, + usage: { inputTokens: 200, outputTokens: 100 }, + genTotalMs: 2000, + }, + ], + }, + ]); + + const ordered = selectOrderedTurnMetrics(s); + expect(ordered).toHaveLength(1); + expect(ordered[0]?.turnId).toBe("t1"); + expect(ordered[0]?.steps).toHaveLength(2); + expect(ordered[0]?.steps[0]?.stepId).toBe("s1"); + expect(ordered[0]?.steps[1]?.stepId).toBe("s2"); + expect(ordered[0]?.total?.usage.inputTokens).toBe(300); + expect(ordered[0]?.total?.durationMs).toBe(5000); + }); +}); + +describe("applyDurableMetrics", () => { + it("stores durable turns in order", () => { + let s = initialMetricsState(); + s = applyDurableMetrics(s, [ + { turnId: "t1", usage: { inputTokens: 10, outputTokens: 5 }, steps: [] }, + { turnId: "t2", usage: { inputTokens: 20, outputTokens: 8 }, steps: [] }, + ]); + expect(s.durableOrder).toEqual(["t1", "t2"]); + expect(s.durable.size).toBe(2); + }); + + it("is idempotent for same turnId", () => { + let s = initialMetricsState(); + const turn = { + turnId: "t1", + usage: { inputTokens: 10, outputTokens: 5 }, + steps: [], + }; + s = applyDurableMetrics(s, [turn]); + s = applyDurableMetrics(s, [turn]); + expect(s.durableOrder).toEqual(["t1"]); + expect(s.durable.size).toBe(1); + }); + + it("overwrites durable turn data for same turnId", () => { + let s = initialMetricsState(); + s = applyDurableMetrics(s, [ + { turnId: "t1", usage: { inputTokens: 10, outputTokens: 5 }, steps: [] }, + ]); + s = applyDurableMetrics(s, [ + { turnId: "t1", usage: { inputTokens: 99, outputTokens: 99 }, steps: [] }, + ]); + expect(s.durable.get("t1")?.usage.inputTokens).toBe(99); + }); +}); diff --git a/src/core/metrics/reducer.ts b/src/core/metrics/reducer.ts new file mode 100644 index 0000000..d36dba1 --- /dev/null +++ b/src/core/metrics/reducer.ts @@ -0,0 +1,239 @@ +import type { AgentEvent, StepId, StepMetrics, TurnMetrics, Usage } from "@dispatch/wire"; +import type { BuildingStep, LiveTurn, MetricsState, TurnMetricsEntry } from "./types"; + +function sumStepUsages(steps: readonly BuildingStep[]): Usage { + let inputTokens = 0; + let outputTokens = 0; + let hasCacheRead = false; + let hasCacheWrite = false; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; + + for (const step of steps) { + if (step.usage === undefined) continue; + inputTokens += step.usage.inputTokens; + outputTokens += step.usage.outputTokens; + if (step.usage.cacheReadTokens !== undefined && step.usage.cacheReadTokens > 0) { + hasCacheRead = true; + cacheReadTokens += step.usage.cacheReadTokens; + } + if (step.usage.cacheWriteTokens !== undefined && step.usage.cacheWriteTokens > 0) { + hasCacheWrite = true; + cacheWriteTokens += step.usage.cacheWriteTokens; + } + } + + const base: Usage = { inputTokens, outputTokens }; + if (hasCacheRead) { + (base as { cacheReadTokens?: number }).cacheReadTokens = cacheReadTokens; + } + if (hasCacheWrite) { + (base as { cacheWriteTokens?: number }).cacheWriteTokens = cacheWriteTokens; + } + return base; +} + +function buildingStepToMetrics(bs: BuildingStep): StepMetrics { + const usage: Usage = bs.usage ?? { inputTokens: 0, outputTokens: 0 }; + const base: StepMetrics = { stepId: bs.stepId as StepId, usage }; + if (bs.ttftMs !== undefined) { + (base as { ttftMs?: number }).ttftMs = bs.ttftMs; + } + if (bs.decodeMs !== undefined) { + (base as { decodeMs?: number }).decodeMs = bs.decodeMs; + } + if (bs.genTotalMs !== undefined) { + (base as { genTotalMs?: number }).genTotalMs = bs.genTotalMs; + } + return base; +} + +function getStep(lt: LiveTurn, id: string): BuildingStep { + const step = lt.stepMap.get(id); + if (step === undefined) throw new Error(`Missing step ${id} in live turn`); + return step; +} + +function liveTurnToMetrics(lt: LiveTurn): TurnMetrics { + const buildingSteps = lt.stepOrder.map((id) => getStep(lt, id)); + const steps = buildingSteps.map((bs) => buildingStepToMetrics(bs)); + const usage = lt.doneUsage ?? sumStepUsages(buildingSteps); + const base: TurnMetrics = { turnId: lt.turnId, usage, steps }; + if (lt.durationMs !== undefined) { + (base as { durationMs?: number }).durationMs = lt.durationMs; + } + return base; +} + +function ensureLiveTurn(state: MetricsState, turnId: string): [MetricsState, LiveTurn] { + const existing = state.live.get(turnId); + if (existing !== undefined) return [state, existing]; + + const newTurn: LiveTurn = { + turnId, + done: false, + durationMs: undefined, + doneUsage: undefined, + stepMap: new Map(), + stepOrder: [], + }; + const newLive = new Map(state.live); + newLive.set(turnId, newTurn); + return [{ ...state, live: newLive, liveOrder: [...state.liveOrder, turnId] }, newTurn]; +} + +function upsertStep(lt: LiveTurn, stepId: string, update: Partial<BuildingStep>): LiveTurn { + const existing = lt.stepMap.get(stepId); + if (existing !== undefined) { + const merged: BuildingStep = { + stepId, + usage: update.usage ?? existing.usage, + ttftMs: update.ttftMs ?? existing.ttftMs, + decodeMs: update.decodeMs ?? existing.decodeMs, + genTotalMs: update.genTotalMs ?? existing.genTotalMs, + complete: update.complete ?? existing.complete, + }; + const newMap = new Map(lt.stepMap); + newMap.set(stepId, merged); + return { ...lt, stepMap: newMap }; + } + + const fresh: BuildingStep = { + stepId, + usage: update.usage, + ttftMs: update.ttftMs, + decodeMs: update.decodeMs, + genTotalMs: update.genTotalMs, + complete: update.complete ?? false, + }; + const newMap = new Map(lt.stepMap); + newMap.set(stepId, fresh); + return { ...lt, stepMap: newMap, stepOrder: [...lt.stepOrder, stepId] }; +} + +/** The initial empty metrics state. */ +export function initialMetricsState(): MetricsState { + return { + live: new Map(), + liveOrder: [], + durable: new Map(), + durableOrder: [], + }; +} + +/** + * Fold one live AgentEvent into the metrics state. + * + * - `usage` with `stepId`: upsert that step's usage. + * - `usage` without `stepId`: ignored. + * - `step-complete`: upsert that step's timing; default usage to zeros if absent. + * - `done`: set turn's `durationMs` and optional aggregate `usage`. + * - All other event types: return state unchanged. + */ +export function foldMetricsEvent(state: MetricsState, event: AgentEvent): MetricsState { + switch (event.type) { + case "usage": { + if (event.stepId === undefined) return state; + const [s1, lt] = ensureLiveTurn(state, event.turnId); + const updated = upsertStep(lt, event.stepId, { usage: event.usage }); + const newLive = new Map(s1.live); + newLive.set(event.turnId, updated); + return { ...s1, live: newLive }; + } + + case "step-complete": { + const [s1, lt] = ensureLiveTurn(state, event.turnId); + const updated = upsertStep(lt, event.stepId, { + ttftMs: event.ttftMs, + decodeMs: event.decodeMs, + genTotalMs: event.genTotalMs, + complete: true, + }); + const newLive = new Map(s1.live); + newLive.set(event.turnId, updated); + return { ...s1, live: newLive }; + } + + case "done": { + const [s1, lt] = ensureLiveTurn(state, event.turnId); + const updated: LiveTurn = { + ...lt, + done: true, + durationMs: event.durationMs ?? lt.durationMs, + doneUsage: event.usage ?? lt.doneUsage, + }; + const newLive = new Map(s1.live); + newLive.set(event.turnId, updated); + return { ...s1, live: newLive }; + } + + default: + return state; + } +} + +/** + * Store durable (sealed) metrics from the backend. These win over live data + * for any shared `turnId`. + */ +export function applyDurableMetrics( + state: MetricsState, + turns: readonly TurnMetrics[], +): MetricsState { + const newDurable = new Map(state.durable); + const newDurableOrder = [...state.durableOrder]; + for (const turn of turns) { + if (!newDurable.has(turn.turnId)) { + newDurableOrder.push(turn.turnId); + } + newDurable.set(turn.turnId, turn); + } + return { + ...state, + durable: newDurable, + durableOrder: newDurableOrder, + }; +} + +/** + * Select the merged ordered list of turn metrics entries. + * Durable turns come first (in their order), then any live turns whose + * `turnId` is not in durable (in live first-seen order). + * + * Each entry contains the completed steps so far and an optional total + * (null until the turn is finalized via `done` or durable data). + * Live turns with no completed steps and not done are omitted. + */ +export function selectOrderedTurnMetrics(state: MetricsState): readonly TurnMetricsEntry[] { + const result: TurnMetricsEntry[] = []; + const seen = new Set<string>(); + + for (const turnId of state.durableOrder) { + const tm = state.durable.get(turnId); + if (tm !== undefined) { + result.push({ turnId, steps: tm.steps, total: tm }); + seen.add(turnId); + } + } + + for (const turnId of state.liveOrder) { + if (seen.has(turnId)) continue; + const lt = state.live.get(turnId); + if (lt === undefined) continue; + + const completeSteps = lt.stepOrder + .map((id) => lt.stepMap.get(id)) + .filter((s): s is BuildingStep => s?.complete === true) + .map((s) => buildingStepToMetrics(s)); + + if (completeSteps.length === 0 && !lt.done) continue; + + result.push({ + turnId, + steps: completeSteps, + total: lt.done ? liveTurnToMetrics(lt) : null, + }); + } + + return result; +} diff --git a/src/core/metrics/types.ts b/src/core/metrics/types.ts new file mode 100644 index 0000000..2b26e8d --- /dev/null +++ b/src/core/metrics/types.ts @@ -0,0 +1,68 @@ +import type { StepMetrics, TurnMetrics, Usage } from "@dispatch/wire"; +import type { RenderGroup } from "../chunks"; + +export type { StepMetrics, TurnMetrics }; + +/** A step being built from live events (may be incomplete). */ +export interface BuildingStep { + readonly stepId: string; + readonly usage: Usage | undefined; + readonly ttftMs: number | undefined; + readonly decodeMs: number | undefined; + readonly genTotalMs: number | undefined; + readonly complete: boolean; +} + +/** A turn being built from live events (in-flight). */ +export interface LiveTurn { + readonly turnId: string; + readonly done: boolean; + readonly durationMs: number | undefined; + readonly doneUsage: Usage | undefined; + readonly stepMap: ReadonlyMap<string, BuildingStep>; + readonly stepOrder: readonly string[]; +} + +/** + * Reducer state for per-turn / per-step token + timing metrics. + * + * - `live`: in-flight turns keyed by `turnId` in FIRST-SEEN order. + * - `durable`: sealed turns keyed by `turnId` in the order they arrived. + */ +export interface MetricsState { + readonly live: ReadonlyMap<string, LiveTurn>; + readonly liveOrder: readonly string[]; + readonly durable: ReadonlyMap<string, TurnMetrics>; + readonly durableOrder: readonly string[]; +} + +/** Per-turn placement entry: completed steps so far + optional turn total. */ +export interface TurnMetricsEntry { + readonly turnId: string; + readonly steps: readonly StepMetrics[]; + readonly total: TurnMetrics | null; +} + +/** A row in the interleaved transcript: a render group, per-step metrics, or turn metrics. */ +export type MetricsRow = + | { readonly kind: "group"; readonly group: RenderGroup } + | { readonly kind: "step-metrics"; readonly step: StepMetrics; readonly index: number } + | { readonly kind: "turn-metrics"; readonly turn: TurnMetrics }; + +/** Formatted per-step view for display. */ +export interface StepMetricsView { + readonly label: string; + readonly tokensLabel: string; + readonly tps: string | null; + readonly ttft: string | null; + readonly decode: string | null; + readonly genTotal: string | null; +} + +/** Formatted per-turn view for display. */ +export interface TurnMetricsView { + readonly tokensLabel: string; + readonly breakdown: string; + readonly tps: string | null; + readonly duration: string | null; +} |
