diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/chunks/reducer.ts | 4 | ||||
| -rw-r--r-- | src/core/telemetry/index.ts | 14 | ||||
| -rw-r--r-- | src/core/telemetry/reducer.test.ts | 252 | ||||
| -rw-r--r-- | src/core/telemetry/reducer.ts | 122 | ||||
| -rw-r--r-- | src/core/telemetry/selectors.ts | 95 | ||||
| -rw-r--r-- | src/core/telemetry/types.ts | 35 | ||||
| -rw-r--r-- | src/core/wire/conformance.test.ts | 14 | ||||
| -rw-r--r-- | src/core/wire/conformance.ts | 2 |
8 files changed, 2 insertions, 536 deletions
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts index 54b1922..1dcfa39 100644 --- a/src/core/chunks/reducer.ts +++ b/src/core/chunks/reducer.ts @@ -148,10 +148,6 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript case "usage": return { ...state, latestUsage: event.usage }; - case "step-complete": - // Timing metadata — no content chunk; handled by the telemetry reducer. - return state; - case "done": { const provisional = flushAccumulating(state.provisional, state.accumulating); return { diff --git a/src/core/telemetry/index.ts b/src/core/telemetry/index.ts deleted file mode 100644 index a528b0d..0000000 --- a/src/core/telemetry/index.ts +++ /dev/null @@ -1,14 +0,0 @@ -export { foldMetricEvent, initialState } from "./reducer"; -export { - stepCount, - stepMetrics, - stepToolDuration, - stepTps, - totalDecodeMs, - totalInputTokens, - totalOutputTokens, - turnMetrics, - turnTps, - turnTtft, -} from "./selectors"; -export type { StepMetrics, TelemetryState, TurnMetrics } from "./types"; diff --git a/src/core/telemetry/reducer.test.ts b/src/core/telemetry/reducer.test.ts deleted file mode 100644 index 119bf96..0000000 --- a/src/core/telemetry/reducer.test.ts +++ /dev/null @@ -1,252 +0,0 @@ -import type { StepId, Usage } from "@dispatch/wire"; -import { describe, expect, it } from "vitest"; -import { foldMetricEvent, initialState } from "./reducer"; -import { - stepCount, - stepMetrics, - stepToolDuration, - stepTps, - totalDecodeMs, - totalInputTokens, - totalOutputTokens, - turnMetrics, - turnTps, - turnTtft, -} from "./selectors"; - -const sid = (s: string) => s as StepId; - -const usage = (turnId: string, stepId: string, u: Usage) => ({ - type: "usage" as const, - conversationId: "c1", - turnId, - stepId: sid(stepId), - usage: u, -}); - -const stepComplete = ( - turnId: string, - stepId: string, - timing: { ttftMs?: number; decodeMs?: number; genTotalMs?: number }, -) => ({ - type: "step-complete" as const, - conversationId: "c1", - turnId, - stepId: sid(stepId), - ...timing, -}); - -describe("foldMetricEvent", () => { - it("turn-start initializes an empty turn", () => { - const s = foldMetricEvent(initialState(), { - type: "turn-start", - conversationId: "c1", - turnId: "t1", - }); - expect(s.turns.get("t1")?.steps).toEqual([]); - }); - - it("step-complete populates timing on a new step", () => { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent( - s, - stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 800, genTotalMs: 1100 }), - ); - - const step = stepMetrics(s, "t1", 0); - expect(step?.ttftMs).toBe(300); - expect(step?.decodeMs).toBe(800); - expect(step?.genTotalMs).toBe(1100); - }); - - it("usage merges tokens into a step (joined by stepId)", () => { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 500 })); - s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 100, outputTokens: 50 })); - - const step = stepMetrics(s, "t1", 0); - expect(step?.usage?.inputTokens).toBe(100); - expect(step?.usage?.outputTokens).toBe(50); - expect(step?.genTotalMs).toBe(500); // timing preserved - }); - - it("usage without stepId is ignored", () => { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent(s, { - type: "usage", - conversationId: "c1", - turnId: "t1", - usage: { inputTokens: 100, outputTokens: 50 }, - // no stepId - }); - expect(s.turns.get("t1")?.steps).toEqual([]); - }); - - it("tool-result accumulates durationMs into its step", () => { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent(s, stepComplete("t1", "s0", {})); - s = foldMetricEvent(s, { - type: "tool-result", - conversationId: "c1", - turnId: "t1", - stepId: sid("s0"), - toolCallId: "tc1", - toolName: "bash", - content: "", - isError: false, - durationMs: 120, - }); - s = foldMetricEvent(s, { - type: "tool-result", - conversationId: "c1", - turnId: "t1", - stepId: sid("s0"), - toolCallId: "tc2", - toolName: "bash", - content: "", - isError: false, - durationMs: 80, - }); - - const step = stepMetrics(s, "t1", 0); - expect(step?.toolDurationMs).toBe(200); - }); - - it("done records turn wall-clock and aggregate usage", () => { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent(s, { - type: "done", - conversationId: "c1", - turnId: "t1", - reason: "complete", - durationMs: 4200, - usage: { inputTokens: 800, outputTokens: 200 }, - }); - - const turn = turnMetrics(s, "t1"); - expect(turn?.wallMs).toBe(4200); - expect(turn?.doneUsage?.outputTokens).toBe(200); - }); - - it("events for an unknown turn are handled gracefully (step-complete, usage)", () => { - const s = initialState(); - // step-complete for a turn we haven't started — creates the turn. - const s2 = foldMetricEvent(s, stepComplete("t1", "s0", { ttftMs: 100 })); - expect(s2.turns.get("t1")?.steps[0]?.ttftMs).toBe(100); - }); - - it("multiple steps accumulate in order", () => { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 100 })); - s = foldMetricEvent(s, stepComplete("t1", "s1", { genTotalMs: 200 })); - - expect(stepCount(s, "t1")).toBe(2); - expect(stepMetrics(s, "t1", 0)?.genTotalMs).toBe(100); - expect(stepMetrics(s, "t1", 1)?.genTotalMs).toBe(200); - }); - - it("non-metric events are no-ops", () => { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent(s, { - type: "text-delta", - conversationId: "c1", - turnId: "t1", - delta: "hi", - }); - s = foldMetricEvent(s, { - type: "turn-sealed", - conversationId: "c1", - turnId: "t1", - }); - expect(s.turns.get("t1")?.steps).toEqual([]); - }); -}); - -describe("selectors — derived metrics", () => { - function populatedState() { - let s = initialState(); - s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" }); - s = foldMetricEvent( - s, - stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 700, genTotalMs: 1000 }), - ); - s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 500, outputTokens: 100 })); - s = foldMetricEvent( - s, - stepComplete("t1", "s1", { ttftMs: 200, decodeMs: 500, genTotalMs: 700 }), - ); - s = foldMetricEvent(s, usage("t1", "s1", { inputTokens: 600, outputTokens: 80 })); - s = foldMetricEvent(s, { - type: "done", - conversationId: "c1", - turnId: "t1", - reason: "complete", - durationMs: 3500, - usage: { inputTokens: 1100, outputTokens: 180 }, - }); - return s; - } - - it("stepTps = outputTokens / (decodeMs / 1000)", () => { - const s = populatedState(); - const step = stepMetrics(s, "t1", 0)!; - expect(stepTps(step)).toBeCloseTo(100 / 0.7, 2); - }); - - it("turnTtft returns first step's ttftMs", () => { - expect(turnTtft(populatedState(), "t1")).toBe(300); - }); - - it("totalDecodeMs sums all steps' decodeMs", () => { - expect(totalDecodeMs(populatedState(), "t1")).toBe(1200); - }); - - it("turnTps = outputTokens / (totalDecodeMs / 1000)", () => { - const s = populatedState(); - expect(turnTps(s, "t1")).toBeCloseTo(180 / 1.2, 2); - }); - - it("totalOutputTokens prefers done.usage over step sum", () => { - const s = populatedState(); - expect(totalOutputTokens(s, "t1")).toBe(180); // from done.usage - }); - - it("totalInputTokens prefers done.usage over step sum", () => { - const s = populatedState(); - expect(totalInputTokens(s, "t1")).toBe(1100); - }); - - it("stepToolDuration returns sum only when > 0", () => { - const withTools = foldMetricEvent( - foldMetricEvent(initialState(), { type: "turn-start", conversationId: "c1", turnId: "t1" }), - { - type: "tool-result", - conversationId: "c1", - turnId: "t1", - stepId: sid("s0"), - toolCallId: "tc1", - toolName: "bash", - content: "", - isError: false, - durationMs: 50, - }, - ); - const step = stepMetrics(withTools, "t1", 0)!; - expect(stepToolDuration(step)).toBe(50); - expect(stepToolDuration({ stepId: sid("s0") })).toBeUndefined(); - }); - - it("returns undefined for absent fields gracefully", () => { - const s = initialState(); - expect(turnMetrics(s, "missing")).toBeUndefined(); - expect(turnTtft(s, "missing")).toBeUndefined(); - expect(turnTps(s, "missing")).toBeUndefined(); - }); -}); diff --git a/src/core/telemetry/reducer.ts b/src/core/telemetry/reducer.ts deleted file mode 100644 index 4083231..0000000 --- a/src/core/telemetry/reducer.ts +++ /dev/null @@ -1,122 +0,0 @@ -import type { AgentEvent, StepId, Usage } from "@dispatch/wire"; -import type { StepMetrics, TelemetryState, TurnMetrics } from "./types"; - -/** The initial empty telemetry state. */ -export function initialState(): TelemetryState { - return { turns: new Map() }; -} - -function mergeStep(existing: StepMetrics, patch: StepMetrics): StepMetrics { - const merged: StepMetrics = { ...existing }; - if (patch.ttftMs !== undefined) (merged as { ttftMs?: number }).ttftMs = patch.ttftMs; - if (patch.decodeMs !== undefined) (merged as { decodeMs?: number }).decodeMs = patch.decodeMs; - if (patch.genTotalMs !== undefined) - (merged as { genTotalMs?: number }).genTotalMs = patch.genTotalMs; - if (patch.usage !== undefined) { - (merged as { usage?: Usage }).usage = { ...existing.usage, ...patch.usage }; - } - if (patch.toolDurationMs !== undefined) { - (merged as { toolDurationMs?: number }).toolDurationMs = - (existing.toolDurationMs ?? 0) + patch.toolDurationMs; - } - return merged; -} - -function upsertStep( - steps: readonly StepMetrics[], - stepId: StepId, - patch: StepMetrics, -): readonly StepMetrics[] { - const idx = steps.findIndex((s) => s.stepId === stepId); - if (idx === -1) { - return [...steps, patch]; - } - return [...steps.slice(0, idx), mergeStep(steps[idx]!, patch), ...steps.slice(idx + 1)]; -} - -function setTurn( - turns: ReadonlyMap<string, TurnMetrics>, - turnId: string, - turn: TurnMetrics, -): ReadonlyMap<string, TurnMetrics> { - const next = new Map(turns); - next.set(turnId, turn); - return next; -} - -/** - * Fold one live AgentEvent into the telemetry state. - * - * - `turn-start` records the active turnId. - * - `step-complete` creates/updates the step's timing metrics. - * - `usage` merges token counts into the step (joined by `stepId`). - * - `tool-result` accumulates `durationMs` into the step. - * - `done` records turn-level wall-clock + token totals. - * - All other event types are no-ops (content events belong to the transcript). - * - * Pure: input → output, no DOM, no side effects. - */ -export function foldMetricEvent(state: TelemetryState, event: AgentEvent): TelemetryState { - switch (event.type) { - case "turn-start": { - return { - ...state, - turns: setTurn(state.turns, event.turnId, { steps: [] }), - }; - } - - case "step-complete": { - const turnId = event.turnId; - const existing = state.turns.get(turnId); - const patch: StepMetrics = { stepId: event.stepId }; - if (event.ttftMs !== undefined) (patch as { ttftMs?: number }).ttftMs = event.ttftMs; - if (event.decodeMs !== undefined) (patch as { decodeMs?: number }).decodeMs = event.decodeMs; - if (event.genTotalMs !== undefined) - (patch as { genTotalMs?: number }).genTotalMs = event.genTotalMs; - const steps = - existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch]; - return { - ...state, - turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics), - }; - } - - case "usage": { - if (event.stepId === undefined) return state; - const turnId = event.turnId; - const existing = state.turns.get(turnId); - const patch: StepMetrics = { stepId: event.stepId, usage: event.usage }; - const steps = - existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch]; - return { - ...state, - turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics), - }; - } - - case "tool-result": { - if (event.durationMs === undefined) return state; - const turnId = event.turnId; - const existing = state.turns.get(turnId); - if (existing === undefined) return state; - const patch: StepMetrics = { stepId: event.stepId, toolDurationMs: event.durationMs }; - const steps = upsertStep(existing.steps, event.stepId, patch); - return { ...state, turns: setTurn(state.turns, turnId, { ...existing, steps }) }; - } - - case "done": { - const turnId = event.turnId; - const existing = state.turns.get(turnId); - const updated: TurnMetrics = { - ...(existing ?? { steps: [] }), - }; - if (event.durationMs !== undefined) - (updated as { wallMs?: number }).wallMs = event.durationMs; - if (event.usage !== undefined) (updated as { doneUsage?: Usage }).doneUsage = event.usage; - return { ...state, turns: setTurn(state.turns, turnId, updated) }; - } - - default: - return state; - } -} diff --git a/src/core/telemetry/selectors.ts b/src/core/telemetry/selectors.ts deleted file mode 100644 index ecf1794..0000000 --- a/src/core/telemetry/selectors.ts +++ /dev/null @@ -1,95 +0,0 @@ -import type { Usage } from "@dispatch/wire"; -import type { StepMetrics, TelemetryState, TurnMetrics } from "./types"; - -/** Get the metrics for a specific step within a turn. */ -export function stepMetrics( - state: TelemetryState, - turnId: string, - stepIndex: number, -): StepMetrics | undefined { - return state.turns.get(turnId)?.steps[stepIndex]; -} - -/** Get the metrics for a turn. */ -export function turnMetrics(state: TelemetryState, turnId: string): TurnMetrics | undefined { - return state.turns.get(turnId); -} - -/** The number of steps in a turn. */ -export function stepCount(state: TelemetryState, turnId: string): number { - return state.turns.get(turnId)?.steps.length ?? 0; -} - -/** TTFT of the first step in a turn (the turn-visible first-token latency). */ -export function turnTtft(state: TelemetryState, turnId: string): number | undefined { - return state.turns.get(turnId)?.steps[0]?.ttftMs; -} - -/** Sum of all steps' decode times in a turn. */ -export function totalDecodeMs(state: TelemetryState, turnId: string): number | undefined { - const steps = state.turns.get(turnId)?.steps; - if (steps === undefined || steps.length === 0) return undefined; - let total = 0; - let found = false; - for (const s of steps) { - if (s.decodeMs !== undefined) { - total += s.decodeMs; - found = true; - } - } - return found ? total : undefined; -} - -/** Aggregate output tokens across all steps in a turn. */ -export function totalOutputTokens(state: TelemetryState, turnId: string): number | undefined { - const turn = state.turns.get(turnId); - if (turn === undefined) return undefined; - if (turn.doneUsage !== undefined) return turn.doneUsage.outputTokens; - let total = 0; - let found = false; - for (const s of turn.steps) { - if (s.usage?.outputTokens !== undefined) { - total += s.usage.outputTokens; - found = true; - } - } - return found ? total : undefined; -} - -/** Aggregate input tokens across all steps in a turn. */ -export function totalInputTokens(state: TelemetryState, turnId: string): number | undefined { - const turn = state.turns.get(turnId); - if (turn === undefined) return undefined; - if (turn.doneUsage !== undefined) return turn.doneUsage.inputTokens; - let total = 0; - let found = false; - for (const s of turn.steps) { - if (s.usage?.inputTokens !== undefined) { - total += s.usage.inputTokens; - found = true; - } - } - return found ? total : undefined; -} - -/** Derived TPS for a step: outputTokens / (decodeMs / 1000). */ -export function stepTps(step: StepMetrics): number | undefined { - if (step.usage?.outputTokens === undefined || step.decodeMs === undefined) return undefined; - if (step.decodeMs === 0) return undefined; - return step.usage.outputTokens / (step.decodeMs / 1000); -} - -/** Derived aggregate TPS for a turn. */ -export function turnTps(state: TelemetryState, turnId: string): number | undefined { - const outTokens = totalOutputTokens(state, turnId); - const decode = totalDecodeMs(state, turnId); - if (outTokens === undefined || decode === undefined || decode === 0) return undefined; - return outTokens / (decode / 1000); -} - -/** Sum of tool execution durations within a step. */ -export function stepToolDuration(step: StepMetrics): number | undefined { - return step.toolDurationMs !== undefined && step.toolDurationMs > 0 - ? step.toolDurationMs - : undefined; -} diff --git a/src/core/telemetry/types.ts b/src/core/telemetry/types.ts deleted file mode 100644 index 395ec93..0000000 --- a/src/core/telemetry/types.ts +++ /dev/null @@ -1,35 +0,0 @@ -import type { StepId, Usage } from "@dispatch/wire"; - -/** - * Per-step metrics, accumulated from `step-complete` + `usage` events. - * All fields optional — absent when the backend had no clock or the step - * produced no text/reasoning token. - */ -export interface StepMetrics { - readonly stepId: StepId; - readonly ttftMs?: number; - readonly decodeMs?: number; - readonly genTotalMs?: number; - readonly usage?: Usage; - readonly toolDurationMs?: number; // sum of tool-result.durationMs in this step -} - -/** - * Per-turn metrics, accumulated from `done` events + per-step aggregation. - */ -export interface TurnMetrics { - readonly wallMs?: number; - readonly doneUsage?: Usage; - readonly steps: readonly StepMetrics[]; -} - -/** - * Pure telemetry state — lives alongside but separate from TranscriptState. - * Accumulates live-only metric events; never persisted (history has no metrics). - * No "active turn" tracking — the consumer (store) passes the relevant turnId - * to the selectors. Pure: events flow in, derived values flow out. - */ -export interface TelemetryState { - /** turnId → TurnMetrics. Multiple turns accumulate (tab switching). */ - readonly turns: ReadonlyMap<string, TurnMetrics>; -} diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts index 690ba4e..50b7f35 100644 --- a/src/core/wire/conformance.test.ts +++ b/src/core/wire/conformance.test.ts @@ -62,15 +62,6 @@ describe("classifies every AgentEvent type", () => { turnId: "t1", usage: { inputTokens: 10, outputTokens: 20 }, }, - { - type: "step-complete", - conversationId: "c1", - turnId: "t1", - stepId: "t1#0" as StepId, - ttftMs: 300, - decodeMs: 700, - genTotalMs: 1000, - }, { type: "error", conversationId: "c1", turnId: "t1", message: "oops" }, { type: "done", conversationId: "c1", turnId: "t1", reason: "complete" }, { type: "turn-sealed", conversationId: "c1", turnId: "t1" }, @@ -87,15 +78,14 @@ describe("classifies every AgentEvent type", () => { "tool-result", "tool-output", "usage", - "step-complete", "error", "done", "turn-sealed", ]); }); - it("covers all 12 AgentEvent variants", () => { - expect(samples).toHaveLength(12); + it("covers all 11 AgentEvent variants", () => { + expect(samples).toHaveLength(11); }); }); diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts index d89772e..5d75a60 100644 --- a/src/core/wire/conformance.ts +++ b/src/core/wire/conformance.ts @@ -30,8 +30,6 @@ export function assertAgentEventExhaustive(event: AgentEvent): string { return "done"; case "turn-sealed": return "turn-sealed"; - case "step-complete": - return "step-complete"; default: return event satisfies never; } |
