summaryrefslogtreecommitdiffhomepage
path: root/src/core/metrics/reducer.ts
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-10 10:06:27 +0900
committerAdam Malczewski <[email protected]>2026-06-10 10:06:27 +0900
commitf8bf715abc8a89ec0c6370b40403c509b1ce2870 (patch)
tree915600a766e042a8491ac57423542cde1dda1eb6 /src/core/metrics/reducer.ts
parentccfd2f4157c1cbbb3d8aeceee94d9e963a82ab03 (diff)
downloaddispatch-web-f8bf715abc8a89ec0c6370b40403c509b1ce2870.tar.gz
dispatch-web-f8bf715abc8a89ec0c6370b40403c509b1ce2870.zip
feat(metrics): per-turn + per-step token/timing metrics bubbles
Consume [email protected] / [email protected] metrics: usage.stepId, step-complete (ttft/decode/genTotal), done.durationMs/usage, and the durable GET /conversations/:id/metrics endpoint. - core/metrics: pure live-fold + durable-merge reducer; decode-rate TPS; head-aligned, stable placement; progressive per-step rows (each shown as its step ends) with the turn-total row gated on the done event. - features/chat: store folds metric events + hydrates durable TurnMetrics; ChatView renders inline step bubbles + a turn-total bubble. - app: MetricsSync HTTP effect (tolerates 404) injected into chat stores. - scripts/live-probe: drives the metrics path; live-verified 17/17 vs bin/up. - docs: regenerate .dispatch wire/transport mirrors to 0.4.0; glossary terms (turn/step metrics, TTFT, decode time, TPS, metrics bubble); trim handoff.
Diffstat (limited to 'src/core/metrics/reducer.ts')
-rw-r--r--src/core/metrics/reducer.ts239
1 files changed, 239 insertions, 0 deletions
diff --git a/src/core/metrics/reducer.ts b/src/core/metrics/reducer.ts
new file mode 100644
index 0000000..d36dba1
--- /dev/null
+++ b/src/core/metrics/reducer.ts
@@ -0,0 +1,239 @@
+import type { AgentEvent, StepId, StepMetrics, TurnMetrics, Usage } from "@dispatch/wire";
+import type { BuildingStep, LiveTurn, MetricsState, TurnMetricsEntry } from "./types";
+
+function sumStepUsages(steps: readonly BuildingStep[]): Usage {
+ let inputTokens = 0;
+ let outputTokens = 0;
+ let hasCacheRead = false;
+ let hasCacheWrite = false;
+ let cacheReadTokens = 0;
+ let cacheWriteTokens = 0;
+
+ for (const step of steps) {
+ if (step.usage === undefined) continue;
+ inputTokens += step.usage.inputTokens;
+ outputTokens += step.usage.outputTokens;
+ if (step.usage.cacheReadTokens !== undefined && step.usage.cacheReadTokens > 0) {
+ hasCacheRead = true;
+ cacheReadTokens += step.usage.cacheReadTokens;
+ }
+ if (step.usage.cacheWriteTokens !== undefined && step.usage.cacheWriteTokens > 0) {
+ hasCacheWrite = true;
+ cacheWriteTokens += step.usage.cacheWriteTokens;
+ }
+ }
+
+ const base: Usage = { inputTokens, outputTokens };
+ if (hasCacheRead) {
+ (base as { cacheReadTokens?: number }).cacheReadTokens = cacheReadTokens;
+ }
+ if (hasCacheWrite) {
+ (base as { cacheWriteTokens?: number }).cacheWriteTokens = cacheWriteTokens;
+ }
+ return base;
+}
+
+function buildingStepToMetrics(bs: BuildingStep): StepMetrics {
+ const usage: Usage = bs.usage ?? { inputTokens: 0, outputTokens: 0 };
+ const base: StepMetrics = { stepId: bs.stepId as StepId, usage };
+ if (bs.ttftMs !== undefined) {
+ (base as { ttftMs?: number }).ttftMs = bs.ttftMs;
+ }
+ if (bs.decodeMs !== undefined) {
+ (base as { decodeMs?: number }).decodeMs = bs.decodeMs;
+ }
+ if (bs.genTotalMs !== undefined) {
+ (base as { genTotalMs?: number }).genTotalMs = bs.genTotalMs;
+ }
+ return base;
+}
+
+function getStep(lt: LiveTurn, id: string): BuildingStep {
+ const step = lt.stepMap.get(id);
+ if (step === undefined) throw new Error(`Missing step ${id} in live turn`);
+ return step;
+}
+
+function liveTurnToMetrics(lt: LiveTurn): TurnMetrics {
+ const buildingSteps = lt.stepOrder.map((id) => getStep(lt, id));
+ const steps = buildingSteps.map((bs) => buildingStepToMetrics(bs));
+ const usage = lt.doneUsage ?? sumStepUsages(buildingSteps);
+ const base: TurnMetrics = { turnId: lt.turnId, usage, steps };
+ if (lt.durationMs !== undefined) {
+ (base as { durationMs?: number }).durationMs = lt.durationMs;
+ }
+ return base;
+}
+
+function ensureLiveTurn(state: MetricsState, turnId: string): [MetricsState, LiveTurn] {
+ const existing = state.live.get(turnId);
+ if (existing !== undefined) return [state, existing];
+
+ const newTurn: LiveTurn = {
+ turnId,
+ done: false,
+ durationMs: undefined,
+ doneUsage: undefined,
+ stepMap: new Map(),
+ stepOrder: [],
+ };
+ const newLive = new Map(state.live);
+ newLive.set(turnId, newTurn);
+ return [{ ...state, live: newLive, liveOrder: [...state.liveOrder, turnId] }, newTurn];
+}
+
+function upsertStep(lt: LiveTurn, stepId: string, update: Partial<BuildingStep>): LiveTurn {
+ const existing = lt.stepMap.get(stepId);
+ if (existing !== undefined) {
+ const merged: BuildingStep = {
+ stepId,
+ usage: update.usage ?? existing.usage,
+ ttftMs: update.ttftMs ?? existing.ttftMs,
+ decodeMs: update.decodeMs ?? existing.decodeMs,
+ genTotalMs: update.genTotalMs ?? existing.genTotalMs,
+ complete: update.complete ?? existing.complete,
+ };
+ const newMap = new Map(lt.stepMap);
+ newMap.set(stepId, merged);
+ return { ...lt, stepMap: newMap };
+ }
+
+ const fresh: BuildingStep = {
+ stepId,
+ usage: update.usage,
+ ttftMs: update.ttftMs,
+ decodeMs: update.decodeMs,
+ genTotalMs: update.genTotalMs,
+ complete: update.complete ?? false,
+ };
+ const newMap = new Map(lt.stepMap);
+ newMap.set(stepId, fresh);
+ return { ...lt, stepMap: newMap, stepOrder: [...lt.stepOrder, stepId] };
+}
+
+/** The initial empty metrics state. */
+export function initialMetricsState(): MetricsState {
+ return {
+ live: new Map(),
+ liveOrder: [],
+ durable: new Map(),
+ durableOrder: [],
+ };
+}
+
+/**
+ * Fold one live AgentEvent into the metrics state.
+ *
+ * - `usage` with `stepId`: upsert that step's usage.
+ * - `usage` without `stepId`: ignored.
+ * - `step-complete`: upsert that step's timing; default usage to zeros if absent.
+ * - `done`: set turn's `durationMs` and optional aggregate `usage`.
+ * - All other event types: return state unchanged.
+ */
+export function foldMetricsEvent(state: MetricsState, event: AgentEvent): MetricsState {
+ switch (event.type) {
+ case "usage": {
+ if (event.stepId === undefined) return state;
+ const [s1, lt] = ensureLiveTurn(state, event.turnId);
+ const updated = upsertStep(lt, event.stepId, { usage: event.usage });
+ const newLive = new Map(s1.live);
+ newLive.set(event.turnId, updated);
+ return { ...s1, live: newLive };
+ }
+
+ case "step-complete": {
+ const [s1, lt] = ensureLiveTurn(state, event.turnId);
+ const updated = upsertStep(lt, event.stepId, {
+ ttftMs: event.ttftMs,
+ decodeMs: event.decodeMs,
+ genTotalMs: event.genTotalMs,
+ complete: true,
+ });
+ const newLive = new Map(s1.live);
+ newLive.set(event.turnId, updated);
+ return { ...s1, live: newLive };
+ }
+
+ case "done": {
+ const [s1, lt] = ensureLiveTurn(state, event.turnId);
+ const updated: LiveTurn = {
+ ...lt,
+ done: true,
+ durationMs: event.durationMs ?? lt.durationMs,
+ doneUsage: event.usage ?? lt.doneUsage,
+ };
+ const newLive = new Map(s1.live);
+ newLive.set(event.turnId, updated);
+ return { ...s1, live: newLive };
+ }
+
+ default:
+ return state;
+ }
+}
+
+/**
+ * Store durable (sealed) metrics from the backend. These win over live data
+ * for any shared `turnId`.
+ */
+export function applyDurableMetrics(
+ state: MetricsState,
+ turns: readonly TurnMetrics[],
+): MetricsState {
+ const newDurable = new Map(state.durable);
+ const newDurableOrder = [...state.durableOrder];
+ for (const turn of turns) {
+ if (!newDurable.has(turn.turnId)) {
+ newDurableOrder.push(turn.turnId);
+ }
+ newDurable.set(turn.turnId, turn);
+ }
+ return {
+ ...state,
+ durable: newDurable,
+ durableOrder: newDurableOrder,
+ };
+}
+
+/**
+ * Select the merged ordered list of turn metrics entries.
+ * Durable turns come first (in their order), then any live turns whose
+ * `turnId` is not in durable (in live first-seen order).
+ *
+ * Each entry contains the completed steps so far and an optional total
+ * (null until the turn is finalized via `done` or durable data).
+ * Live turns with no completed steps and not done are omitted.
+ */
+export function selectOrderedTurnMetrics(state: MetricsState): readonly TurnMetricsEntry[] {
+ const result: TurnMetricsEntry[] = [];
+ const seen = new Set<string>();
+
+ for (const turnId of state.durableOrder) {
+ const tm = state.durable.get(turnId);
+ if (tm !== undefined) {
+ result.push({ turnId, steps: tm.steps, total: tm });
+ seen.add(turnId);
+ }
+ }
+
+ for (const turnId of state.liveOrder) {
+ if (seen.has(turnId)) continue;
+ const lt = state.live.get(turnId);
+ if (lt === undefined) continue;
+
+ const completeSteps = lt.stepOrder
+ .map((id) => lt.stepMap.get(id))
+ .filter((s): s is BuildingStep => s?.complete === true)
+ .map((s) => buildingStepToMetrics(s));
+
+ if (completeSteps.length === 0 && !lt.done) continue;
+
+ result.push({
+ turnId,
+ steps: completeSteps,
+ total: lt.done ? liveTurnToMetrics(lt) : null,
+ });
+ }
+
+ return result;
+}