summaryrefslogtreecommitdiffhomepage
path: root/src/core/telemetry
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-07 18:41:27 +0900
committerAdam Malczewski <[email protected]>2026-06-07 18:41:27 +0900
commit48c6d85c3cc5a57a729f14068e2346b17ed62088 (patch)
treeec56590653f399f4a5feae0245652eba8f352ad5 /src/core/telemetry
parent2e79dd122e5664353e02e0d33715ae8c1041a379 (diff)
downloaddispatch-web-48c6d85c3cc5a57a729f14068e2346b17ed62088.tar.gz
dispatch-web-48c6d85c3cc5a57a729f14068e2346b17ed62088.zip
feat(chat): live turn metrics — telemetry reducer + rendering
Consume wire/transport-contract 0.3.0 (step-complete event + timing fields on usage/tool-result/done). Pure core/telemetry module: foldMetricEvent (reducer) + derived selectors (stepTps, turnTps, etc). TelemetryState is pure data, no active-turn tracking — consumers pass turnId to selectors. ChatStore wires foldMetricEvent into handleDelta and exposes telemetry + currentTurnId. ChatView shows step-metrics footer (time/TPS/tokens) on assistant text bubbles and durationMs badge on tool cards. New TurnSummary component renders turn-level stats (wall-clock, tokens, steps, TPS) in a DaisyUI stats block. Extended live-probe to verify telemetry events against bin/up (pending backend restart). 336 tests, typecheck 0, biome clean, build ok.
Diffstat (limited to 'src/core/telemetry')
-rw-r--r--src/core/telemetry/index.ts14
-rw-r--r--src/core/telemetry/reducer.test.ts252
-rw-r--r--src/core/telemetry/reducer.ts122
-rw-r--r--src/core/telemetry/selectors.ts95
-rw-r--r--src/core/telemetry/types.ts35
5 files changed, 518 insertions, 0 deletions
diff --git a/src/core/telemetry/index.ts b/src/core/telemetry/index.ts
new file mode 100644
index 0000000..a528b0d
--- /dev/null
+++ b/src/core/telemetry/index.ts
@@ -0,0 +1,14 @@
+export { foldMetricEvent, initialState } from "./reducer";
+export {
+ stepCount,
+ stepMetrics,
+ stepToolDuration,
+ stepTps,
+ totalDecodeMs,
+ totalInputTokens,
+ totalOutputTokens,
+ turnMetrics,
+ turnTps,
+ turnTtft,
+} from "./selectors";
+export type { StepMetrics, TelemetryState, TurnMetrics } from "./types";
diff --git a/src/core/telemetry/reducer.test.ts b/src/core/telemetry/reducer.test.ts
new file mode 100644
index 0000000..119bf96
--- /dev/null
+++ b/src/core/telemetry/reducer.test.ts
@@ -0,0 +1,252 @@
+import type { StepId, Usage } from "@dispatch/wire";
+import { describe, expect, it } from "vitest";
+import { foldMetricEvent, initialState } from "./reducer";
+import {
+ stepCount,
+ stepMetrics,
+ stepToolDuration,
+ stepTps,
+ totalDecodeMs,
+ totalInputTokens,
+ totalOutputTokens,
+ turnMetrics,
+ turnTps,
+ turnTtft,
+} from "./selectors";
+
+const sid = (s: string) => s as StepId;
+
+const usage = (turnId: string, stepId: string, u: Usage) => ({
+ type: "usage" as const,
+ conversationId: "c1",
+ turnId,
+ stepId: sid(stepId),
+ usage: u,
+});
+
+const stepComplete = (
+ turnId: string,
+ stepId: string,
+ timing: { ttftMs?: number; decodeMs?: number; genTotalMs?: number },
+) => ({
+ type: "step-complete" as const,
+ conversationId: "c1",
+ turnId,
+ stepId: sid(stepId),
+ ...timing,
+});
+
+describe("foldMetricEvent", () => {
+ it("turn-start initializes an empty turn", () => {
+ const s = foldMetricEvent(initialState(), {
+ type: "turn-start",
+ conversationId: "c1",
+ turnId: "t1",
+ });
+ expect(s.turns.get("t1")?.steps).toEqual([]);
+ });
+
+ it("step-complete populates timing on a new step", () => {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(
+ s,
+ stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 800, genTotalMs: 1100 }),
+ );
+
+ const step = stepMetrics(s, "t1", 0);
+ expect(step?.ttftMs).toBe(300);
+ expect(step?.decodeMs).toBe(800);
+ expect(step?.genTotalMs).toBe(1100);
+ });
+
+ it("usage merges tokens into a step (joined by stepId)", () => {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 500 }));
+ s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 100, outputTokens: 50 }));
+
+ const step = stepMetrics(s, "t1", 0);
+ expect(step?.usage?.inputTokens).toBe(100);
+ expect(step?.usage?.outputTokens).toBe(50);
+ expect(step?.genTotalMs).toBe(500); // timing preserved
+ });
+
+ it("usage without stepId is ignored", () => {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(s, {
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ usage: { inputTokens: 100, outputTokens: 50 },
+ // no stepId
+ });
+ expect(s.turns.get("t1")?.steps).toEqual([]);
+ });
+
+ it("tool-result accumulates durationMs into its step", () => {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(s, stepComplete("t1", "s0", {}));
+ s = foldMetricEvent(s, {
+ type: "tool-result",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: sid("s0"),
+ toolCallId: "tc1",
+ toolName: "bash",
+ content: "",
+ isError: false,
+ durationMs: 120,
+ });
+ s = foldMetricEvent(s, {
+ type: "tool-result",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: sid("s0"),
+ toolCallId: "tc2",
+ toolName: "bash",
+ content: "",
+ isError: false,
+ durationMs: 80,
+ });
+
+ const step = stepMetrics(s, "t1", 0);
+ expect(step?.toolDurationMs).toBe(200);
+ });
+
+ it("done records turn wall-clock and aggregate usage", () => {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(s, {
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "complete",
+ durationMs: 4200,
+ usage: { inputTokens: 800, outputTokens: 200 },
+ });
+
+ const turn = turnMetrics(s, "t1");
+ expect(turn?.wallMs).toBe(4200);
+ expect(turn?.doneUsage?.outputTokens).toBe(200);
+ });
+
+ it("events for an unknown turn are handled gracefully (step-complete, usage)", () => {
+ const s = initialState();
+ // step-complete for a turn we haven't started — creates the turn.
+ const s2 = foldMetricEvent(s, stepComplete("t1", "s0", { ttftMs: 100 }));
+ expect(s2.turns.get("t1")?.steps[0]?.ttftMs).toBe(100);
+ });
+
+ it("multiple steps accumulate in order", () => {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 100 }));
+ s = foldMetricEvent(s, stepComplete("t1", "s1", { genTotalMs: 200 }));
+
+ expect(stepCount(s, "t1")).toBe(2);
+ expect(stepMetrics(s, "t1", 0)?.genTotalMs).toBe(100);
+ expect(stepMetrics(s, "t1", 1)?.genTotalMs).toBe(200);
+ });
+
+ it("non-metric events are no-ops", () => {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(s, {
+ type: "text-delta",
+ conversationId: "c1",
+ turnId: "t1",
+ delta: "hi",
+ });
+ s = foldMetricEvent(s, {
+ type: "turn-sealed",
+ conversationId: "c1",
+ turnId: "t1",
+ });
+ expect(s.turns.get("t1")?.steps).toEqual([]);
+ });
+});
+
+describe("selectors — derived metrics", () => {
+ function populatedState() {
+ let s = initialState();
+ s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
+ s = foldMetricEvent(
+ s,
+ stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 700, genTotalMs: 1000 }),
+ );
+ s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 500, outputTokens: 100 }));
+ s = foldMetricEvent(
+ s,
+ stepComplete("t1", "s1", { ttftMs: 200, decodeMs: 500, genTotalMs: 700 }),
+ );
+ s = foldMetricEvent(s, usage("t1", "s1", { inputTokens: 600, outputTokens: 80 }));
+ s = foldMetricEvent(s, {
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "complete",
+ durationMs: 3500,
+ usage: { inputTokens: 1100, outputTokens: 180 },
+ });
+ return s;
+ }
+
+ it("stepTps = outputTokens / (decodeMs / 1000)", () => {
+ const s = populatedState();
+ const step = stepMetrics(s, "t1", 0)!;
+ expect(stepTps(step)).toBeCloseTo(100 / 0.7, 2);
+ });
+
+ it("turnTtft returns first step's ttftMs", () => {
+ expect(turnTtft(populatedState(), "t1")).toBe(300);
+ });
+
+ it("totalDecodeMs sums all steps' decodeMs", () => {
+ expect(totalDecodeMs(populatedState(), "t1")).toBe(1200);
+ });
+
+ it("turnTps = outputTokens / (totalDecodeMs / 1000)", () => {
+ const s = populatedState();
+ expect(turnTps(s, "t1")).toBeCloseTo(180 / 1.2, 2);
+ });
+
+ it("totalOutputTokens prefers done.usage over step sum", () => {
+ const s = populatedState();
+ expect(totalOutputTokens(s, "t1")).toBe(180); // from done.usage
+ });
+
+ it("totalInputTokens prefers done.usage over step sum", () => {
+ const s = populatedState();
+ expect(totalInputTokens(s, "t1")).toBe(1100);
+ });
+
+ it("stepToolDuration returns sum only when > 0", () => {
+ const withTools = foldMetricEvent(
+ foldMetricEvent(initialState(), { type: "turn-start", conversationId: "c1", turnId: "t1" }),
+ {
+ type: "tool-result",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: sid("s0"),
+ toolCallId: "tc1",
+ toolName: "bash",
+ content: "",
+ isError: false,
+ durationMs: 50,
+ },
+ );
+ const step = stepMetrics(withTools, "t1", 0)!;
+ expect(stepToolDuration(step)).toBe(50);
+ expect(stepToolDuration({ stepId: sid("s0") })).toBeUndefined();
+ });
+
+ it("returns undefined for absent fields gracefully", () => {
+ const s = initialState();
+ expect(turnMetrics(s, "missing")).toBeUndefined();
+ expect(turnTtft(s, "missing")).toBeUndefined();
+ expect(turnTps(s, "missing")).toBeUndefined();
+ });
+});
diff --git a/src/core/telemetry/reducer.ts b/src/core/telemetry/reducer.ts
new file mode 100644
index 0000000..4083231
--- /dev/null
+++ b/src/core/telemetry/reducer.ts
@@ -0,0 +1,122 @@
+import type { AgentEvent, StepId, Usage } from "@dispatch/wire";
+import type { StepMetrics, TelemetryState, TurnMetrics } from "./types";
+
+/** The initial empty telemetry state. */
+export function initialState(): TelemetryState {
+ return { turns: new Map() };
+}
+
+function mergeStep(existing: StepMetrics, patch: StepMetrics): StepMetrics {
+ const merged: StepMetrics = { ...existing };
+ if (patch.ttftMs !== undefined) (merged as { ttftMs?: number }).ttftMs = patch.ttftMs;
+ if (patch.decodeMs !== undefined) (merged as { decodeMs?: number }).decodeMs = patch.decodeMs;
+ if (patch.genTotalMs !== undefined)
+ (merged as { genTotalMs?: number }).genTotalMs = patch.genTotalMs;
+ if (patch.usage !== undefined) {
+ (merged as { usage?: Usage }).usage = { ...existing.usage, ...patch.usage };
+ }
+ if (patch.toolDurationMs !== undefined) {
+ (merged as { toolDurationMs?: number }).toolDurationMs =
+ (existing.toolDurationMs ?? 0) + patch.toolDurationMs;
+ }
+ return merged;
+}
+
+function upsertStep(
+ steps: readonly StepMetrics[],
+ stepId: StepId,
+ patch: StepMetrics,
+): readonly StepMetrics[] {
+ const idx = steps.findIndex((s) => s.stepId === stepId);
+ if (idx === -1) {
+ return [...steps, patch];
+ }
+ return [...steps.slice(0, idx), mergeStep(steps[idx]!, patch), ...steps.slice(idx + 1)];
+}
+
+function setTurn(
+ turns: ReadonlyMap<string, TurnMetrics>,
+ turnId: string,
+ turn: TurnMetrics,
+): ReadonlyMap<string, TurnMetrics> {
+ const next = new Map(turns);
+ next.set(turnId, turn);
+ return next;
+}
+
+/**
+ * Fold one live AgentEvent into the telemetry state.
+ *
+ * - `turn-start` records the active turnId.
+ * - `step-complete` creates/updates the step's timing metrics.
+ * - `usage` merges token counts into the step (joined by `stepId`).
+ * - `tool-result` accumulates `durationMs` into the step.
+ * - `done` records turn-level wall-clock + token totals.
+ * - All other event types are no-ops (content events belong to the transcript).
+ *
+ * Pure: input → output, no DOM, no side effects.
+ */
+export function foldMetricEvent(state: TelemetryState, event: AgentEvent): TelemetryState {
+ switch (event.type) {
+ case "turn-start": {
+ return {
+ ...state,
+ turns: setTurn(state.turns, event.turnId, { steps: [] }),
+ };
+ }
+
+ case "step-complete": {
+ const turnId = event.turnId;
+ const existing = state.turns.get(turnId);
+ const patch: StepMetrics = { stepId: event.stepId };
+ if (event.ttftMs !== undefined) (patch as { ttftMs?: number }).ttftMs = event.ttftMs;
+ if (event.decodeMs !== undefined) (patch as { decodeMs?: number }).decodeMs = event.decodeMs;
+ if (event.genTotalMs !== undefined)
+ (patch as { genTotalMs?: number }).genTotalMs = event.genTotalMs;
+ const steps =
+ existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch];
+ return {
+ ...state,
+ turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics),
+ };
+ }
+
+ case "usage": {
+ if (event.stepId === undefined) return state;
+ const turnId = event.turnId;
+ const existing = state.turns.get(turnId);
+ const patch: StepMetrics = { stepId: event.stepId, usage: event.usage };
+ const steps =
+ existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch];
+ return {
+ ...state,
+ turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics),
+ };
+ }
+
+ case "tool-result": {
+ if (event.durationMs === undefined) return state;
+ const turnId = event.turnId;
+ const existing = state.turns.get(turnId);
+ if (existing === undefined) return state;
+ const patch: StepMetrics = { stepId: event.stepId, toolDurationMs: event.durationMs };
+ const steps = upsertStep(existing.steps, event.stepId, patch);
+ return { ...state, turns: setTurn(state.turns, turnId, { ...existing, steps }) };
+ }
+
+ case "done": {
+ const turnId = event.turnId;
+ const existing = state.turns.get(turnId);
+ const updated: TurnMetrics = {
+ ...(existing ?? { steps: [] }),
+ };
+ if (event.durationMs !== undefined)
+ (updated as { wallMs?: number }).wallMs = event.durationMs;
+ if (event.usage !== undefined) (updated as { doneUsage?: Usage }).doneUsage = event.usage;
+ return { ...state, turns: setTurn(state.turns, turnId, updated) };
+ }
+
+ default:
+ return state;
+ }
+}
diff --git a/src/core/telemetry/selectors.ts b/src/core/telemetry/selectors.ts
new file mode 100644
index 0000000..ecf1794
--- /dev/null
+++ b/src/core/telemetry/selectors.ts
@@ -0,0 +1,95 @@
+import type { Usage } from "@dispatch/wire";
+import type { StepMetrics, TelemetryState, TurnMetrics } from "./types";
+
+/** Get the metrics for a specific step within a turn. */
+export function stepMetrics(
+ state: TelemetryState,
+ turnId: string,
+ stepIndex: number,
+): StepMetrics | undefined {
+ return state.turns.get(turnId)?.steps[stepIndex];
+}
+
+/** Get the metrics for a turn. */
+export function turnMetrics(state: TelemetryState, turnId: string): TurnMetrics | undefined {
+ return state.turns.get(turnId);
+}
+
+/** The number of steps in a turn. */
+export function stepCount(state: TelemetryState, turnId: string): number {
+ return state.turns.get(turnId)?.steps.length ?? 0;
+}
+
+/** TTFT of the first step in a turn (the turn-visible first-token latency). */
+export function turnTtft(state: TelemetryState, turnId: string): number | undefined {
+ return state.turns.get(turnId)?.steps[0]?.ttftMs;
+}
+
+/** Sum of all steps' decode times in a turn. */
+export function totalDecodeMs(state: TelemetryState, turnId: string): number | undefined {
+ const steps = state.turns.get(turnId)?.steps;
+ if (steps === undefined || steps.length === 0) return undefined;
+ let total = 0;
+ let found = false;
+ for (const s of steps) {
+ if (s.decodeMs !== undefined) {
+ total += s.decodeMs;
+ found = true;
+ }
+ }
+ return found ? total : undefined;
+}
+
+/** Aggregate output tokens across all steps in a turn. */
+export function totalOutputTokens(state: TelemetryState, turnId: string): number | undefined {
+ const turn = state.turns.get(turnId);
+ if (turn === undefined) return undefined;
+ if (turn.doneUsage !== undefined) return turn.doneUsage.outputTokens;
+ let total = 0;
+ let found = false;
+ for (const s of turn.steps) {
+ if (s.usage?.outputTokens !== undefined) {
+ total += s.usage.outputTokens;
+ found = true;
+ }
+ }
+ return found ? total : undefined;
+}
+
+/** Aggregate input tokens across all steps in a turn. */
+export function totalInputTokens(state: TelemetryState, turnId: string): number | undefined {
+ const turn = state.turns.get(turnId);
+ if (turn === undefined) return undefined;
+ if (turn.doneUsage !== undefined) return turn.doneUsage.inputTokens;
+ let total = 0;
+ let found = false;
+ for (const s of turn.steps) {
+ if (s.usage?.inputTokens !== undefined) {
+ total += s.usage.inputTokens;
+ found = true;
+ }
+ }
+ return found ? total : undefined;
+}
+
+/** Derived TPS for a step: outputTokens / (decodeMs / 1000). */
+export function stepTps(step: StepMetrics): number | undefined {
+ if (step.usage?.outputTokens === undefined || step.decodeMs === undefined) return undefined;
+ if (step.decodeMs === 0) return undefined;
+ return step.usage.outputTokens / (step.decodeMs / 1000);
+}
+
+/** Derived aggregate TPS for a turn. */
+export function turnTps(state: TelemetryState, turnId: string): number | undefined {
+ const outTokens = totalOutputTokens(state, turnId);
+ const decode = totalDecodeMs(state, turnId);
+ if (outTokens === undefined || decode === undefined || decode === 0) return undefined;
+ return outTokens / (decode / 1000);
+}
+
+/** Sum of tool execution durations within a step. */
+export function stepToolDuration(step: StepMetrics): number | undefined {
+ return step.toolDurationMs !== undefined && step.toolDurationMs > 0
+ ? step.toolDurationMs
+ : undefined;
+}
diff --git a/src/core/telemetry/types.ts b/src/core/telemetry/types.ts
new file mode 100644
index 0000000..395ec93
--- /dev/null
+++ b/src/core/telemetry/types.ts
@@ -0,0 +1,35 @@
+import type { StepId, Usage } from "@dispatch/wire";
+
+/**
+ * Per-step metrics, accumulated from `step-complete` + `usage` events.
+ * All fields optional — absent when the backend had no clock or the step
+ * produced no text/reasoning token.
+ */
+export interface StepMetrics {
+ readonly stepId: StepId;
+ readonly ttftMs?: number;
+ readonly decodeMs?: number;
+ readonly genTotalMs?: number;
+ readonly usage?: Usage;
+ readonly toolDurationMs?: number; // sum of tool-result.durationMs in this step
+}
+
+/**
+ * Per-turn metrics, accumulated from `done` events + per-step aggregation.
+ */
+export interface TurnMetrics {
+ readonly wallMs?: number;
+ readonly doneUsage?: Usage;
+ readonly steps: readonly StepMetrics[];
+}
+
+/**
+ * Pure telemetry state — lives alongside but separate from TranscriptState.
+ * Accumulates live-only metric events; never persisted (history has no metrics).
+ * No "active turn" tracking — the consumer (store) passes the relevant turnId
+ * to the selectors. Pure: events flow in, derived values flow out.
+ */
+export interface TelemetryState {
+ /** turnId → TurnMetrics. Multiple turns accumulate (tab switching). */
+ readonly turns: ReadonlyMap<string, TurnMetrics>;
+}