summaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/chunks/reducer.ts4
-rw-r--r--src/core/telemetry/index.ts14
-rw-r--r--src/core/telemetry/reducer.test.ts252
-rw-r--r--src/core/telemetry/reducer.ts122
-rw-r--r--src/core/telemetry/selectors.ts95
-rw-r--r--src/core/telemetry/types.ts35
-rw-r--r--src/core/wire/conformance.test.ts14
-rw-r--r--src/core/wire/conformance.ts2
8 files changed, 2 insertions, 536 deletions
diff --git a/src/core/chunks/reducer.ts b/src/core/chunks/reducer.ts
index 54b1922..1dcfa39 100644
--- a/src/core/chunks/reducer.ts
+++ b/src/core/chunks/reducer.ts
@@ -148,10 +148,6 @@ export function foldEvent(state: TranscriptState, event: AgentEvent): Transcript
case "usage":
return { ...state, latestUsage: event.usage };
- case "step-complete":
- // Timing metadata — no content chunk; handled by the telemetry reducer.
- return state;
-
case "done": {
const provisional = flushAccumulating(state.provisional, state.accumulating);
return {
diff --git a/src/core/telemetry/index.ts b/src/core/telemetry/index.ts
deleted file mode 100644
index a528b0d..0000000
--- a/src/core/telemetry/index.ts
+++ /dev/null
@@ -1,14 +0,0 @@
-export { foldMetricEvent, initialState } from "./reducer";
-export {
- stepCount,
- stepMetrics,
- stepToolDuration,
- stepTps,
- totalDecodeMs,
- totalInputTokens,
- totalOutputTokens,
- turnMetrics,
- turnTps,
- turnTtft,
-} from "./selectors";
-export type { StepMetrics, TelemetryState, TurnMetrics } from "./types";
diff --git a/src/core/telemetry/reducer.test.ts b/src/core/telemetry/reducer.test.ts
deleted file mode 100644
index 119bf96..0000000
--- a/src/core/telemetry/reducer.test.ts
+++ /dev/null
@@ -1,252 +0,0 @@
-import type { StepId, Usage } from "@dispatch/wire";
-import { describe, expect, it } from "vitest";
-import { foldMetricEvent, initialState } from "./reducer";
-import {
- stepCount,
- stepMetrics,
- stepToolDuration,
- stepTps,
- totalDecodeMs,
- totalInputTokens,
- totalOutputTokens,
- turnMetrics,
- turnTps,
- turnTtft,
-} from "./selectors";
-
-const sid = (s: string) => s as StepId;
-
-const usage = (turnId: string, stepId: string, u: Usage) => ({
- type: "usage" as const,
- conversationId: "c1",
- turnId,
- stepId: sid(stepId),
- usage: u,
-});
-
-const stepComplete = (
- turnId: string,
- stepId: string,
- timing: { ttftMs?: number; decodeMs?: number; genTotalMs?: number },
-) => ({
- type: "step-complete" as const,
- conversationId: "c1",
- turnId,
- stepId: sid(stepId),
- ...timing,
-});
-
-describe("foldMetricEvent", () => {
- it("turn-start initializes an empty turn", () => {
- const s = foldMetricEvent(initialState(), {
- type: "turn-start",
- conversationId: "c1",
- turnId: "t1",
- });
- expect(s.turns.get("t1")?.steps).toEqual([]);
- });
-
- it("step-complete populates timing on a new step", () => {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(
- s,
- stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 800, genTotalMs: 1100 }),
- );
-
- const step = stepMetrics(s, "t1", 0);
- expect(step?.ttftMs).toBe(300);
- expect(step?.decodeMs).toBe(800);
- expect(step?.genTotalMs).toBe(1100);
- });
-
- it("usage merges tokens into a step (joined by stepId)", () => {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 500 }));
- s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 100, outputTokens: 50 }));
-
- const step = stepMetrics(s, "t1", 0);
- expect(step?.usage?.inputTokens).toBe(100);
- expect(step?.usage?.outputTokens).toBe(50);
- expect(step?.genTotalMs).toBe(500); // timing preserved
- });
-
- it("usage without stepId is ignored", () => {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(s, {
- type: "usage",
- conversationId: "c1",
- turnId: "t1",
- usage: { inputTokens: 100, outputTokens: 50 },
- // no stepId
- });
- expect(s.turns.get("t1")?.steps).toEqual([]);
- });
-
- it("tool-result accumulates durationMs into its step", () => {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(s, stepComplete("t1", "s0", {}));
- s = foldMetricEvent(s, {
- type: "tool-result",
- conversationId: "c1",
- turnId: "t1",
- stepId: sid("s0"),
- toolCallId: "tc1",
- toolName: "bash",
- content: "",
- isError: false,
- durationMs: 120,
- });
- s = foldMetricEvent(s, {
- type: "tool-result",
- conversationId: "c1",
- turnId: "t1",
- stepId: sid("s0"),
- toolCallId: "tc2",
- toolName: "bash",
- content: "",
- isError: false,
- durationMs: 80,
- });
-
- const step = stepMetrics(s, "t1", 0);
- expect(step?.toolDurationMs).toBe(200);
- });
-
- it("done records turn wall-clock and aggregate usage", () => {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(s, {
- type: "done",
- conversationId: "c1",
- turnId: "t1",
- reason: "complete",
- durationMs: 4200,
- usage: { inputTokens: 800, outputTokens: 200 },
- });
-
- const turn = turnMetrics(s, "t1");
- expect(turn?.wallMs).toBe(4200);
- expect(turn?.doneUsage?.outputTokens).toBe(200);
- });
-
- it("events for an unknown turn are handled gracefully (step-complete, usage)", () => {
- const s = initialState();
- // step-complete for a turn we haven't started — creates the turn.
- const s2 = foldMetricEvent(s, stepComplete("t1", "s0", { ttftMs: 100 }));
- expect(s2.turns.get("t1")?.steps[0]?.ttftMs).toBe(100);
- });
-
- it("multiple steps accumulate in order", () => {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(s, stepComplete("t1", "s0", { genTotalMs: 100 }));
- s = foldMetricEvent(s, stepComplete("t1", "s1", { genTotalMs: 200 }));
-
- expect(stepCount(s, "t1")).toBe(2);
- expect(stepMetrics(s, "t1", 0)?.genTotalMs).toBe(100);
- expect(stepMetrics(s, "t1", 1)?.genTotalMs).toBe(200);
- });
-
- it("non-metric events are no-ops", () => {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(s, {
- type: "text-delta",
- conversationId: "c1",
- turnId: "t1",
- delta: "hi",
- });
- s = foldMetricEvent(s, {
- type: "turn-sealed",
- conversationId: "c1",
- turnId: "t1",
- });
- expect(s.turns.get("t1")?.steps).toEqual([]);
- });
-});
-
-describe("selectors — derived metrics", () => {
- function populatedState() {
- let s = initialState();
- s = foldMetricEvent(s, { type: "turn-start", conversationId: "c1", turnId: "t1" });
- s = foldMetricEvent(
- s,
- stepComplete("t1", "s0", { ttftMs: 300, decodeMs: 700, genTotalMs: 1000 }),
- );
- s = foldMetricEvent(s, usage("t1", "s0", { inputTokens: 500, outputTokens: 100 }));
- s = foldMetricEvent(
- s,
- stepComplete("t1", "s1", { ttftMs: 200, decodeMs: 500, genTotalMs: 700 }),
- );
- s = foldMetricEvent(s, usage("t1", "s1", { inputTokens: 600, outputTokens: 80 }));
- s = foldMetricEvent(s, {
- type: "done",
- conversationId: "c1",
- turnId: "t1",
- reason: "complete",
- durationMs: 3500,
- usage: { inputTokens: 1100, outputTokens: 180 },
- });
- return s;
- }
-
- it("stepTps = outputTokens / (decodeMs / 1000)", () => {
- const s = populatedState();
- const step = stepMetrics(s, "t1", 0)!;
- expect(stepTps(step)).toBeCloseTo(100 / 0.7, 2);
- });
-
- it("turnTtft returns first step's ttftMs", () => {
- expect(turnTtft(populatedState(), "t1")).toBe(300);
- });
-
- it("totalDecodeMs sums all steps' decodeMs", () => {
- expect(totalDecodeMs(populatedState(), "t1")).toBe(1200);
- });
-
- it("turnTps = outputTokens / (totalDecodeMs / 1000)", () => {
- const s = populatedState();
- expect(turnTps(s, "t1")).toBeCloseTo(180 / 1.2, 2);
- });
-
- it("totalOutputTokens prefers done.usage over step sum", () => {
- const s = populatedState();
- expect(totalOutputTokens(s, "t1")).toBe(180); // from done.usage
- });
-
- it("totalInputTokens prefers done.usage over step sum", () => {
- const s = populatedState();
- expect(totalInputTokens(s, "t1")).toBe(1100);
- });
-
- it("stepToolDuration returns sum only when > 0", () => {
- const withTools = foldMetricEvent(
- foldMetricEvent(initialState(), { type: "turn-start", conversationId: "c1", turnId: "t1" }),
- {
- type: "tool-result",
- conversationId: "c1",
- turnId: "t1",
- stepId: sid("s0"),
- toolCallId: "tc1",
- toolName: "bash",
- content: "",
- isError: false,
- durationMs: 50,
- },
- );
- const step = stepMetrics(withTools, "t1", 0)!;
- expect(stepToolDuration(step)).toBe(50);
- expect(stepToolDuration({ stepId: sid("s0") })).toBeUndefined();
- });
-
- it("returns undefined for absent fields gracefully", () => {
- const s = initialState();
- expect(turnMetrics(s, "missing")).toBeUndefined();
- expect(turnTtft(s, "missing")).toBeUndefined();
- expect(turnTps(s, "missing")).toBeUndefined();
- });
-});
diff --git a/src/core/telemetry/reducer.ts b/src/core/telemetry/reducer.ts
deleted file mode 100644
index 4083231..0000000
--- a/src/core/telemetry/reducer.ts
+++ /dev/null
@@ -1,122 +0,0 @@
-import type { AgentEvent, StepId, Usage } from "@dispatch/wire";
-import type { StepMetrics, TelemetryState, TurnMetrics } from "./types";
-
-/** The initial empty telemetry state. */
-export function initialState(): TelemetryState {
- return { turns: new Map() };
-}
-
-function mergeStep(existing: StepMetrics, patch: StepMetrics): StepMetrics {
- const merged: StepMetrics = { ...existing };
- if (patch.ttftMs !== undefined) (merged as { ttftMs?: number }).ttftMs = patch.ttftMs;
- if (patch.decodeMs !== undefined) (merged as { decodeMs?: number }).decodeMs = patch.decodeMs;
- if (patch.genTotalMs !== undefined)
- (merged as { genTotalMs?: number }).genTotalMs = patch.genTotalMs;
- if (patch.usage !== undefined) {
- (merged as { usage?: Usage }).usage = { ...existing.usage, ...patch.usage };
- }
- if (patch.toolDurationMs !== undefined) {
- (merged as { toolDurationMs?: number }).toolDurationMs =
- (existing.toolDurationMs ?? 0) + patch.toolDurationMs;
- }
- return merged;
-}
-
-function upsertStep(
- steps: readonly StepMetrics[],
- stepId: StepId,
- patch: StepMetrics,
-): readonly StepMetrics[] {
- const idx = steps.findIndex((s) => s.stepId === stepId);
- if (idx === -1) {
- return [...steps, patch];
- }
- return [...steps.slice(0, idx), mergeStep(steps[idx]!, patch), ...steps.slice(idx + 1)];
-}
-
-function setTurn(
- turns: ReadonlyMap<string, TurnMetrics>,
- turnId: string,
- turn: TurnMetrics,
-): ReadonlyMap<string, TurnMetrics> {
- const next = new Map(turns);
- next.set(turnId, turn);
- return next;
-}
-
-/**
- * Fold one live AgentEvent into the telemetry state.
- *
- * - `turn-start` records the active turnId.
- * - `step-complete` creates/updates the step's timing metrics.
- * - `usage` merges token counts into the step (joined by `stepId`).
- * - `tool-result` accumulates `durationMs` into the step.
- * - `done` records turn-level wall-clock + token totals.
- * - All other event types are no-ops (content events belong to the transcript).
- *
- * Pure: input → output, no DOM, no side effects.
- */
-export function foldMetricEvent(state: TelemetryState, event: AgentEvent): TelemetryState {
- switch (event.type) {
- case "turn-start": {
- return {
- ...state,
- turns: setTurn(state.turns, event.turnId, { steps: [] }),
- };
- }
-
- case "step-complete": {
- const turnId = event.turnId;
- const existing = state.turns.get(turnId);
- const patch: StepMetrics = { stepId: event.stepId };
- if (event.ttftMs !== undefined) (patch as { ttftMs?: number }).ttftMs = event.ttftMs;
- if (event.decodeMs !== undefined) (patch as { decodeMs?: number }).decodeMs = event.decodeMs;
- if (event.genTotalMs !== undefined)
- (patch as { genTotalMs?: number }).genTotalMs = event.genTotalMs;
- const steps =
- existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch];
- return {
- ...state,
- turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics),
- };
- }
-
- case "usage": {
- if (event.stepId === undefined) return state;
- const turnId = event.turnId;
- const existing = state.turns.get(turnId);
- const patch: StepMetrics = { stepId: event.stepId, usage: event.usage };
- const steps =
- existing !== undefined ? upsertStep(existing.steps, event.stepId, patch) : [patch];
- return {
- ...state,
- turns: setTurn(state.turns, turnId, { ...existing, steps } as TurnMetrics),
- };
- }
-
- case "tool-result": {
- if (event.durationMs === undefined) return state;
- const turnId = event.turnId;
- const existing = state.turns.get(turnId);
- if (existing === undefined) return state;
- const patch: StepMetrics = { stepId: event.stepId, toolDurationMs: event.durationMs };
- const steps = upsertStep(existing.steps, event.stepId, patch);
- return { ...state, turns: setTurn(state.turns, turnId, { ...existing, steps }) };
- }
-
- case "done": {
- const turnId = event.turnId;
- const existing = state.turns.get(turnId);
- const updated: TurnMetrics = {
- ...(existing ?? { steps: [] }),
- };
- if (event.durationMs !== undefined)
- (updated as { wallMs?: number }).wallMs = event.durationMs;
- if (event.usage !== undefined) (updated as { doneUsage?: Usage }).doneUsage = event.usage;
- return { ...state, turns: setTurn(state.turns, turnId, updated) };
- }
-
- default:
- return state;
- }
-}
diff --git a/src/core/telemetry/selectors.ts b/src/core/telemetry/selectors.ts
deleted file mode 100644
index ecf1794..0000000
--- a/src/core/telemetry/selectors.ts
+++ /dev/null
@@ -1,95 +0,0 @@
-import type { Usage } from "@dispatch/wire";
-import type { StepMetrics, TelemetryState, TurnMetrics } from "./types";
-
-/** Get the metrics for a specific step within a turn. */
-export function stepMetrics(
- state: TelemetryState,
- turnId: string,
- stepIndex: number,
-): StepMetrics | undefined {
- return state.turns.get(turnId)?.steps[stepIndex];
-}
-
-/** Get the metrics for a turn. */
-export function turnMetrics(state: TelemetryState, turnId: string): TurnMetrics | undefined {
- return state.turns.get(turnId);
-}
-
-/** The number of steps in a turn. */
-export function stepCount(state: TelemetryState, turnId: string): number {
- return state.turns.get(turnId)?.steps.length ?? 0;
-}
-
-/** TTFT of the first step in a turn (the turn-visible first-token latency). */
-export function turnTtft(state: TelemetryState, turnId: string): number | undefined {
- return state.turns.get(turnId)?.steps[0]?.ttftMs;
-}
-
-/** Sum of all steps' decode times in a turn. */
-export function totalDecodeMs(state: TelemetryState, turnId: string): number | undefined {
- const steps = state.turns.get(turnId)?.steps;
- if (steps === undefined || steps.length === 0) return undefined;
- let total = 0;
- let found = false;
- for (const s of steps) {
- if (s.decodeMs !== undefined) {
- total += s.decodeMs;
- found = true;
- }
- }
- return found ? total : undefined;
-}
-
-/** Aggregate output tokens across all steps in a turn. */
-export function totalOutputTokens(state: TelemetryState, turnId: string): number | undefined {
- const turn = state.turns.get(turnId);
- if (turn === undefined) return undefined;
- if (turn.doneUsage !== undefined) return turn.doneUsage.outputTokens;
- let total = 0;
- let found = false;
- for (const s of turn.steps) {
- if (s.usage?.outputTokens !== undefined) {
- total += s.usage.outputTokens;
- found = true;
- }
- }
- return found ? total : undefined;
-}
-
-/** Aggregate input tokens across all steps in a turn. */
-export function totalInputTokens(state: TelemetryState, turnId: string): number | undefined {
- const turn = state.turns.get(turnId);
- if (turn === undefined) return undefined;
- if (turn.doneUsage !== undefined) return turn.doneUsage.inputTokens;
- let total = 0;
- let found = false;
- for (const s of turn.steps) {
- if (s.usage?.inputTokens !== undefined) {
- total += s.usage.inputTokens;
- found = true;
- }
- }
- return found ? total : undefined;
-}
-
-/** Derived TPS for a step: outputTokens / (decodeMs / 1000). */
-export function stepTps(step: StepMetrics): number | undefined {
- if (step.usage?.outputTokens === undefined || step.decodeMs === undefined) return undefined;
- if (step.decodeMs === 0) return undefined;
- return step.usage.outputTokens / (step.decodeMs / 1000);
-}
-
-/** Derived aggregate TPS for a turn. */
-export function turnTps(state: TelemetryState, turnId: string): number | undefined {
- const outTokens = totalOutputTokens(state, turnId);
- const decode = totalDecodeMs(state, turnId);
- if (outTokens === undefined || decode === undefined || decode === 0) return undefined;
- return outTokens / (decode / 1000);
-}
-
-/** Sum of tool execution durations within a step. */
-export function stepToolDuration(step: StepMetrics): number | undefined {
- return step.toolDurationMs !== undefined && step.toolDurationMs > 0
- ? step.toolDurationMs
- : undefined;
-}
diff --git a/src/core/telemetry/types.ts b/src/core/telemetry/types.ts
deleted file mode 100644
index 395ec93..0000000
--- a/src/core/telemetry/types.ts
+++ /dev/null
@@ -1,35 +0,0 @@
-import type { StepId, Usage } from "@dispatch/wire";
-
-/**
- * Per-step metrics, accumulated from `step-complete` + `usage` events.
- * All fields optional — absent when the backend had no clock or the step
- * produced no text/reasoning token.
- */
-export interface StepMetrics {
- readonly stepId: StepId;
- readonly ttftMs?: number;
- readonly decodeMs?: number;
- readonly genTotalMs?: number;
- readonly usage?: Usage;
- readonly toolDurationMs?: number; // sum of tool-result.durationMs in this step
-}
-
-/**
- * Per-turn metrics, accumulated from `done` events + per-step aggregation.
- */
-export interface TurnMetrics {
- readonly wallMs?: number;
- readonly doneUsage?: Usage;
- readonly steps: readonly StepMetrics[];
-}
-
-/**
- * Pure telemetry state — lives alongside but separate from TranscriptState.
- * Accumulates live-only metric events; never persisted (history has no metrics).
- * No "active turn" tracking — the consumer (store) passes the relevant turnId
- * to the selectors. Pure: events flow in, derived values flow out.
- */
-export interface TelemetryState {
- /** turnId → TurnMetrics. Multiple turns accumulate (tab switching). */
- readonly turns: ReadonlyMap<string, TurnMetrics>;
-}
diff --git a/src/core/wire/conformance.test.ts b/src/core/wire/conformance.test.ts
index 690ba4e..50b7f35 100644
--- a/src/core/wire/conformance.test.ts
+++ b/src/core/wire/conformance.test.ts
@@ -62,15 +62,6 @@ describe("classifies every AgentEvent type", () => {
turnId: "t1",
usage: { inputTokens: 10, outputTokens: 20 },
},
- {
- type: "step-complete",
- conversationId: "c1",
- turnId: "t1",
- stepId: "t1#0" as StepId,
- ttftMs: 300,
- decodeMs: 700,
- genTotalMs: 1000,
- },
{ type: "error", conversationId: "c1", turnId: "t1", message: "oops" },
{ type: "done", conversationId: "c1", turnId: "t1", reason: "complete" },
{ type: "turn-sealed", conversationId: "c1", turnId: "t1" },
@@ -87,15 +78,14 @@ describe("classifies every AgentEvent type", () => {
"tool-result",
"tool-output",
"usage",
- "step-complete",
"error",
"done",
"turn-sealed",
]);
});
- it("covers all 12 AgentEvent variants", () => {
- expect(samples).toHaveLength(12);
+ it("covers all 11 AgentEvent variants", () => {
+ expect(samples).toHaveLength(11);
});
});
diff --git a/src/core/wire/conformance.ts b/src/core/wire/conformance.ts
index d89772e..5d75a60 100644
--- a/src/core/wire/conformance.ts
+++ b/src/core/wire/conformance.ts
@@ -30,8 +30,6 @@ export function assertAgentEventExhaustive(event: AgentEvent): string {
return "done";
case "turn-sealed":
return "turn-sealed";
- case "step-complete":
- return "step-complete";
default:
return event satisfies never;
}