diff options
| author | Adam Malczewski <[email protected]> | 2026-06-07 18:41:27 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-07 18:41:27 +0900 |
| commit | 48c6d85c3cc5a57a729f14068e2346b17ed62088 (patch) | |
| tree | ec56590653f399f4a5feae0245652eba8f352ad5 /src/core/telemetry | |
| parent | 2e79dd122e5664353e02e0d33715ae8c1041a379 (diff) | |
| download | dispatch-web-48c6d85c3cc5a57a729f14068e2346b17ed62088.tar.gz dispatch-web-48c6d85c3cc5a57a729f14068e2346b17ed62088.zip | |
feat(chat): live turn metrics — telemetry reducer + rendering
Consume wire/transport-contract 0.3.0 (step-complete event + timing
fields on usage/tool-result/done). Pure core/telemetry module:
foldMetricEvent (reducer) + derived selectors (stepTps, turnTps, etc).
TelemetryState is pure data, no active-turn tracking — consumers pass
turnId to selectors. ChatStore wires foldMetricEvent into handleDelta
and exposes telemetry + currentTurnId. ChatView shows step-metrics
footer (time/TPS/tokens) on assistant text bubbles and durationMs badge
on tool cards. New TurnSummary component renders turn-level stats
(wall-clock, tokens, steps, TPS) in a DaisyUI stats block. Extended
live-probe to verify telemetry events against bin/up (pending backend
restart). 336 tests, typecheck 0, biome clean, build ok.
Diffstat (limited to 'src/core/telemetry')
| -rw-r--r-- | src/core/telemetry/index.ts | 14 | ||||
| -rw-r--r-- | src/core/telemetry/reducer.test.ts | 252 | ||||
| -rw-r--r-- | src/core/telemetry/reducer.ts | 122 | ||||
| -rw-r--r-- | src/core/telemetry/selectors.ts | 95 | ||||
| -rw-r--r-- | src/core/telemetry/types.ts | 35 |
5 files changed, 518 insertions, 0 deletions
diff --git a/src/core/telemetry/index.ts b/src/core/telemetry/index.ts new file mode 100644 index 0000000..a528b0d --- /dev/null +++ b/src/core/telemetry/index.ts @@ -0,0 +1,14 @@ +export { foldMetricEvent, initialState } from "./reducer"; +export { + stepCount, + stepMetrics, + stepToolDuration, + stepTps, + totalDecodeMs, + totalInputTokens, + totalOutputTokens, + turnMetrics, + turnTps, + turnTtft, +} from "./selectors"; +export type { StepMetrics, TelemetryState, TurnMetrics } from "./types"; diff --git a/src/core/telemetry/reducer.test.ts b/src/core/telemetry/reducer.test.ts new file mode 100644 index 0000000..119bf96 --- /dev/null +++ b/src/core/telemetry/reducer.test.ts @@ -0,0 +1,252 @@ +import type { StepId, Usage } from "@dispatch/wire"; +import { describe, expect, it } from "vitest"; +import { foldMetricEvent, initialState } from "./reducer"; +import { + stepCount, + stepMetrics, + stepToolDuration, + stepTps, + totalDecodeMs, + totalInputTokens, + totalOutputTokens, + turnMetrics, + turnTps, + turnTtft, +} from "./selectors"; + +const sid = (s: string) => s as StepId; + +const usage = (turnId: string, stepId: string, u: Usage) => ({ + type: "usage" as const, + conversationId: "c1", + turnId, + stepId: sid(stepId), + usage: u, +}); + +const stepComplete = ( + turnId: string, + stepId: string, + timing: { ttftMs?: number; decodeMs?: number; genTotalMs?: number }, +) => ({ + type: "step-complete" as const, + conversationId: "c1", + turnId, + stepId: sid(stepId), + ...timing, +}); + +describe("foldMetricEvent", () => { + it("turn-start initializes an empty turn", () => { + const s = foldMetricEvent(initialState(), { + type: "turn-start", + conversationId: "c1", + turnId: "t1", + }); + expect(s.turns.get("t1")?.steps).toEqual([]); + }); + + it("step-complete populates timing on a new step", () => { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent( + s, + stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 800, genTotalMs: 1100 }), + ); + + const step = stepMetrics(s, "t1", 0); + expect(step?.ttftMs).toBe(300); + expect(step?.decodeMs).toBe(800); + expect(step?.genTotalMs).toBe(1100); + }); + + it("usage merges tokens into a step (joined by stepId)", () => { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 500 })); + s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 100, outputTokens: 50 })); + + const step = stepMetrics(s, "t1", 0); + expect(step?.usage?.inputTokens).toBe(100); + expect(step?.usage?.outputTokens).toBe(50); + expect(step?.genTotalMs).toBe(500); // timing preserved + }); + + it("usage without stepId is ignored", () => { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent(s, { + type: "usage", + conversationId: "c1", + turnId: "t1", + usage: { inputTokens: 100, outputTokens: 50 }, + // no stepId + }); + expect(s.turns.get("t1")?.steps).toEqual([]); + }); + + it("tool-result accumulates durationMs into its step", () => { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent(s, stepComplete("t1", "s0", {})); + s = foldMetricEvent(s, { + type: "tool-result", + conversationId: "c1", + turnId: "t1", + stepId: sid("s0"), + toolCallId: "tc1", + toolName: "bash", + content: "", + isError: false, + durationMs: 120, + }); + s = foldMetricEvent(s, { + type: "tool-result", + conversationId: "c1", + turnId: "t1", + stepId: sid("s0"), + toolCallId: "tc2", + toolName: "bash", + content: "", + isError: false, + durationMs: 80, + }); + + const step = stepMetrics(s, "t1", 0); + expect(step?.toolDurationMs).toBe(200); + }); + + it("done records turn wall-clock and aggregate usage", () => { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent(s, { + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "complete", + durationMs: 4200, + usage: { inputTokens: 800, outputTokens: 200 }, + }); + + const turn = turnMetrics(s, "t1"); + expect(turn?.wallMs).toBe(4200); + expect(turn?.doneUsage?.outputTokens).toBe(200); + }); + + it("events for an unknown turn are handled gracefully (step-complete, usage)", () => { + const s = initialState(); + // step-complete for a turn we haven't started — creates the turn. + const s2 = foldMetricEvent(s, stepComplete("t1", "s0", { ttftMs: 100 })); + expect(s2.turns.get("t1")?.steps[0]?.ttftMs).toBe(100); + }); + + it("multiple steps accumulate in order", () => { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 100 })); + s = foldMetricEvent(s, stepComplete("t1", "s1", { genTotalMs: 200 })); + + expect(stepCount(s, "t1")).toBe(2); + expect(stepMetrics(s, "t1", 0)?.genTotalMs).toBe(100); + expect(stepMetrics(s, "t1", 1)?.genTotalMs).toBe(200); + }); + + it("non-metric events are no-ops", () => { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent(s, { + type: "text-delta", + conversationId: "c1", + turnId: "t1", + delta: "hi", + }); + s = foldMetricEvent(s, { + type: "turn-sealed", + conversationId: "c1", + turnId: "t1", + }); + expect(s.turns.get("t1")?.steps).toEqual([]); + }); +}); + +describe("selectors — derived metrics", () => { + function populatedState() { + let s = initialState(); + s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); + s = foldMetricEvent( + s, + stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 700, genTotalMs: 1000 }), + ); + s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 500, outputTokens: 100 })); + s = foldMetricEvent( + s, + stepComplete("t1", "s1", { ttftMs: 200, decodeMs: 500, genTotalMs: 700 }), + ); + s = foldMetricEvent(s, usage("t1", "s1", { inputTokens: 600, outputTokens: 80 })); + s = foldMetricEvent(s, { + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "complete", + durationMs: 3500, + usage: { inputTokens: 1100, outputTokens: 180 }, + }); + return s; + } + + it("stepTps = outputTokens / (decodeMs / 1000)", () => { + const s = populatedState(); + const step = stepMetrics(s, "t1", 0)!; + expect(stepTps(step)).toBeCloseTo(100 / 0.7, 2); + }); + + it("turnTtft returns first step's ttftMs", () => { + expect(turnTtft(populatedState(), "t1")).toBe(300); + }); + + it("totalDecodeMs sums all steps' decodeMs", () => { + expect(totalDecodeMs(populatedState(), "t1")).toBe(1200); + }); + + it("turnTps = outputTokens / (totalDecodeMs / 1000)", () => { + const s = populatedState(); + expect(turnTps(s, "t1")).toBeCloseTo(180 / 1.2, 2); + }); + + it("totalOutputTokens prefers done.usage over step sum", () => { + const s = populatedState(); + expect(totalOutputTokens(s, "t1")).toBe(180); // from done.usage + }); + + it("totalInputTokens prefers done.usage over step sum", () => { + const s = populatedState(); + expect(totalInputTokens(s, "t1")).toBe(1100); + }); + + it("stepToolDuration returns sum only when > 0", () => { + const withTools = foldMetricEvent( + foldMetricEvent(initialState(), { type: "turn-start", conversationId: "c1", turnId: "t1" }), + { + type: "tool-result", + conversationId: "c1", + turnId: "t1", + stepId: sid("s0"), + toolCallId: "tc1", + toolName: "bash", + content: "", + isError: false, + durationMs: 50, + }, + ); + const step = stepMetrics(withTools, "t1", 0)!; + expect(stepToolDuration(step)).toBe(50); + expect(stepToolDuration({ stepId: sid("s0") })).toBeUndefined(); + }); + + it("returns undefined for absent fields gracefully", () => { + const s = initialState(); + expect(turnMetrics(s, "missing")).toBeUndefined(); + expect(turnTtft(s, "missing")).toBeUndefined(); + expect(turnTps(s, "missing")).toBeUndefined(); + }); +}); diff --git a/src/core/telemetry/reducer.ts b/src/core/telemetry/reducer.ts new file mode 100644 index 0000000..4083231 --- /dev/null +++ b/src/core/telemetry/reducer.ts @@ -0,0 +1,122 @@ +import type { AgentEvent, StepId, Usage } from "@dispatch/wire"; +import type { StepMetrics, TelemetryState, TurnMetrics } from "./types"; + +/** The initial empty telemetry state. */ +export function initialState(): TelemetryState { + return { turns: new Map() }; +} + +function mergeStep(existing: StepMetrics, patch: StepMetrics): StepMetrics { + const merged: StepMetrics = { ...existing }; + if (patch.ttftMs !== undefined) (merged as { ttftMs?: number }).ttftMs = patch.ttftMs; + if (patch.decodeMs !== undefined) (merged as { decodeMs?: number }).decodeMs = patch.decodeMs; + if (patch.genTotalMs !== undefined) + (merged as { genTotalMs?: number }).genTotalMs = patch.genTotalMs; + if (patch.usage !== undefined) { + (merged as { usage?: Usage }).usage = { ...existing.usage, ...patch.usage }; + } + if (patch.toolDurationMs !== undefined) { + (merged as { toolDurationMs?: number }).toolDurationMs = + (existing.toolDurationMs ?? 0) + patch.toolDurationMs; + } + return merged; +} + +function upsertStep( + steps: readonly StepMetrics[], + stepId: StepId, + patch: StepMetrics, +): readonly StepMetrics[] { + const idx = steps.findIndex((s) => s.stepId === stepId); + if (idx === -1) { + return [...steps, patch]; + } + return [...steps.slice(0, idx), mergeStep(steps[idx]!, patch), ...steps.slice(idx + 1)]; +} + +function setTurn( + turns: ReadonlyMap<string, TurnMetrics>, + turnId: string, + turn: TurnMetrics, +): ReadonlyMap<string, TurnMetrics> { + const next = new Map(turns); + next.set(turnId, turn); + return next; +} + +/** + * Fold one live AgentEvent into the telemetry state. + * + * - `turn-start` records the active turnId. + * - `step-complete` creates/updates the step's timing metrics. + * - `usage` merges token counts into the step (joined by `stepId`). + * - `tool-result` accumulates `durationMs` into the step. + * - `done` records turn-level wall-clock + token totals. + * - All other event types are no-ops (content events belong to the transcript). + * + * Pure: input → output, no DOM, no side effects. + */ +export function foldMetricEvent(state: TelemetryState, event: AgentEvent): TelemetryState { + switch (event.type) { + case "turn-start": { + return { + ...state, + turns: setTurn(state.turns, event.turnId, { steps: [] }), + }; + } + + case "step-complete": { + const turnId = event.turnId; + const existing = state.turns.get(turnId); + const patch: StepMetrics = { stepId: event.stepId }; + if (event.ttftMs !== undefined) (patch as { ttftMs?: number }).ttftMs = event.ttftMs; + if (event.decodeMs !== undefined) (patch as { decodeMs?: number }).decodeMs = event.decodeMs; + if (event.genTotalMs !== undefined) + (patch as { genTotalMs?: number }).genTotalMs = event.genTotalMs; + const steps = + existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch]; + return { + ...state, + turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics), + }; + } + + case "usage": { + if (event.stepId === undefined) return state; + const turnId = event.turnId; + const existing = state.turns.get(turnId); + const patch: StepMetrics = { stepId: event.stepId, usage: event.usage }; + const steps = + existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch]; + return { + ...state, + turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics), + }; + } + + case "tool-result": { + if (event.durationMs === undefined) return state; + const turnId = event.turnId; + const existing = state.turns.get(turnId); + if (existing === undefined) return state; + const patch: StepMetrics = { stepId: event.stepId, toolDurationMs: event.durationMs }; + const steps = upsertStep(existing.steps, event.stepId, patch); + return { ...state, turns: setTurn(state.turns, turnId, { ...existing, steps }) }; + } + + case "done": { + const turnId = event.turnId; + const existing = state.turns.get(turnId); + const updated: TurnMetrics = { + ...(existing ?? { steps: [] }), + }; + if (event.durationMs !== undefined) + (updated as { wallMs?: number }).wallMs = event.durationMs; + if (event.usage !== undefined) (updated as { doneUsage?: Usage }).doneUsage = event.usage; + return { ...state, turns: setTurn(state.turns, turnId, updated) }; + } + + default: + return state; + } +} diff --git a/src/core/telemetry/selectors.ts b/src/core/telemetry/selectors.ts new file mode 100644 index 0000000..ecf1794 --- /dev/null +++ b/src/core/telemetry/selectors.ts @@ -0,0 +1,95 @@ +import type { Usage } from "@dispatch/wire"; +import type { StepMetrics, TelemetryState, TurnMetrics } from "./types"; + +/** Get the metrics for a specific step within a turn. */ +export function stepMetrics( + state: TelemetryState, + turnId: string, + stepIndex: number, +): StepMetrics | undefined { + return state.turns.get(turnId)?.steps[stepIndex]; +} + +/** Get the metrics for a turn. */ +export function turnMetrics(state: TelemetryState, turnId: string): TurnMetrics | undefined { + return state.turns.get(turnId); +} + +/** The number of steps in a turn. */ +export function stepCount(state: TelemetryState, turnId: string): number { + return state.turns.get(turnId)?.steps.length ?? 0; +} + +/** TTFT of the first step in a turn (the turn-visible first-token latency). */ +export function turnTtft(state: TelemetryState, turnId: string): number | undefined { + return state.turns.get(turnId)?.steps[0]?.ttftMs; +} + +/** Sum of all steps' decode times in a turn. */ +export function totalDecodeMs(state: TelemetryState, turnId: string): number | undefined { + const steps = state.turns.get(turnId)?.steps; + if (steps === undefined || steps.length === 0) return undefined; + let total = 0; + let found = false; + for (const s of steps) { + if (s.decodeMs !== undefined) { + total += s.decodeMs; + found = true; + } + } + return found ? total : undefined; +} + +/** Aggregate output tokens across all steps in a turn. */ +export function totalOutputTokens(state: TelemetryState, turnId: string): number | undefined { + const turn = state.turns.get(turnId); + if (turn === undefined) return undefined; + if (turn.doneUsage !== undefined) return turn.doneUsage.outputTokens; + let total = 0; + let found = false; + for (const s of turn.steps) { + if (s.usage?.outputTokens !== undefined) { + total += s.usage.outputTokens; + found = true; + } + } + return found ? total : undefined; +} + +/** Aggregate input tokens across all steps in a turn. */ +export function totalInputTokens(state: TelemetryState, turnId: string): number | undefined { + const turn = state.turns.get(turnId); + if (turn === undefined) return undefined; + if (turn.doneUsage !== undefined) return turn.doneUsage.inputTokens; + let total = 0; + let found = false; + for (const s of turn.steps) { + if (s.usage?.inputTokens !== undefined) { + total += s.usage.inputTokens; + found = true; + } + } + return found ? total : undefined; +} + +/** Derived TPS for a step: outputTokens / (decodeMs / 1000). */ +export function stepTps(step: StepMetrics): number | undefined { + if (step.usage?.outputTokens === undefined || step.decodeMs === undefined) return undefined; + if (step.decodeMs === 0) return undefined; + return step.usage.outputTokens / (step.decodeMs / 1000); +} + +/** Derived aggregate TPS for a turn. */ +export function turnTps(state: TelemetryState, turnId: string): number | undefined { + const outTokens = totalOutputTokens(state, turnId); + const decode = totalDecodeMs(state, turnId); + if (outTokens === undefined || decode === undefined || decode === 0) return undefined; + return outTokens / (decode / 1000); +} + +/** Sum of tool execution durations within a step. */ +export function stepToolDuration(step: StepMetrics): number | undefined { + return step.toolDurationMs !== undefined && step.toolDurationMs > 0 + ? step.toolDurationMs + : undefined; +} diff --git a/src/core/telemetry/types.ts b/src/core/telemetry/types.ts new file mode 100644 index 0000000..395ec93 --- /dev/null +++ b/src/core/telemetry/types.ts @@ -0,0 +1,35 @@ +import type { StepId, Usage } from "@dispatch/wire"; + +/** + * Per-step metrics, accumulated from `step-complete` + `usage` events. + * All fields optional — absent when the backend had no clock or the step + * produced no text/reasoning token. + */ +export interface StepMetrics { + readonly stepId: StepId; + readonly ttftMs?: number; + readonly decodeMs?: number; + readonly genTotalMs?: number; + readonly usage?: Usage; + readonly toolDurationMs?: number; // sum of tool-result.durationMs in this step +} + +/** + * Per-turn metrics, accumulated from `done` events + per-step aggregation. + */ +export interface TurnMetrics { + readonly wallMs?: number; + readonly doneUsage?: Usage; + readonly steps: readonly StepMetrics[]; +} + +/** + * Pure telemetry state — lives alongside but separate from TranscriptState. + * Accumulates live-only metric events; never persisted (history has no metrics). + * No "active turn" tracking — the consumer (store) passes the relevant turnId + * to the selectors. Pure: events flow in, derived values flow out. + */ +export interface TelemetryState { + /** turnId → TurnMetrics. Multiple turns accumulate (tab switching). */ + readonly turns: ReadonlyMap<string, TurnMetrics>; +} |
