diff options
| author | Adam Malczewski <[email protected]> | 2026-06-10 08:29:59 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-10 08:29:59 +0900 |
| commit | 6db12ff70acb3333d05a5020ab66da4172a5225a (patch) | |
| tree | de5cc6314a3a6dd966d7c4fdb9b20adb04ae8307 | |
| parent | 4248cd1d546a4c1fb4e68940c11b5e309c2c2736 (diff) | |
| download | dispatch-6db12ff70acb3333d05a5020ab66da4172a5225a.tar.gz dispatch-6db12ff70acb3333d05a5020ab66da4172a5225a.zip | |
feat(metrics): durable per-turn/step token+timing metrics (observability spans + persisted replay)
Two-part token-data improvement:
#2 Observability spans (kernel run-turn): turn & step span-close now stamp
ALL four Usage fields — added usage.cacheReadTokens/cacheWriteTokens (were
silently dropped) and normalized usage_* -> usage.* to match the
provider.request span (consistent D9 GROUP BY). No contract change.
#3 Persisted replay metrics (conversation-store + read endpoint): new
StepMetrics/TurnMetrics wire types; conversation-store persists per-turn
metrics in a separate key space (appendMetrics/loadMetrics, turn-append
order); session-orchestrator accumulates per-step+turn metrics from the
event stream (pure metrics.ts) and persists after seal; transport-http
serves GET /conversations/:id/metrics -> ConversationMetricsResponse.
Contracts: @dispatch/wire + @dispatch/transport-contract bumped 0.3.0->0.4.0
(additive). GLOSSARY: turn metrics / step metrics.
typecheck EXIT 0, biome clean, 546 vitest + 89 bun = 635 tests.
| -rw-r--r-- | GLOSSARY.md | 2 | ||||
| -rw-r--r-- | packages/conversation-store/src/keys.ts | 20 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.test.ts | 140 | ||||
| -rw-r--r-- | packages/conversation-store/src/store.ts | 43 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/conversation.ts | 2 | ||||
| -rw-r--r-- | packages/kernel/src/contracts/index.ts | 2 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.test.ts | 151 | ||||
| -rw-r--r-- | packages/kernel/src/runtime/run-turn.ts | 20 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/metrics.test.ts | 264 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/metrics.ts | 124 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.test.ts | 319 | ||||
| -rw-r--r-- | packages/session-orchestrator/src/orchestrator.ts | 12 | ||||
| -rw-r--r-- | packages/transport-contract/package.json | 2 | ||||
| -rw-r--r-- | packages/transport-contract/src/index.ts | 23 | ||||
| -rw-r--r-- | packages/transport-http/src/app.test.ts | 128 | ||||
| -rw-r--r-- | packages/transport-http/src/app.ts | 23 | ||||
| -rw-r--r-- | packages/transport-http/src/server.bun.test.ts | 4 | ||||
| -rw-r--r-- | packages/wire/package.json | 2 | ||||
| -rw-r--r-- | packages/wire/src/index.ts | 41 |
19 files changed, 1307 insertions, 15 deletions
diff --git a/GLOSSARY.md b/GLOSSARY.md index 65aa6f2..845b45f 100644 --- a/GLOSSARY.md +++ b/GLOSSARY.md @@ -20,6 +20,8 @@ | **stepId** | The identifier of a step, stamped on each `tool-call`/`tool-result` event and tool chunk it produces, so a client groups a parallel/batched tool-call set by equality. Branded `StepId`; the runtime derives it deterministically as `<turnId>#<stepIndex>` (0-based). Generation provenance carried ON the tool chunk (unlike `seq`, which is a store-assigned sync cursor on the `StoredChunk` envelope). Treat as opaque. | batchId, step index (as the wire key) | | **TTFT** (time to first token) | Per-step latency from the generation stream starting to the first content token (text **or** reasoning) arriving. Inherently per LLM round-trip — each step re-prefills, so a turn has one TTFT per step; step 0's is the turn's user-visible first-token latency, and the sum across steps is the total prefill overhead. Captured as observability span timing only (not on the wire). | time-to-first-byte (when meaning tokens) | | **decode time** | The generation time of a step *after* the first token — first token → stream end, i.e. the step's generation total minus its TTFT. The model's token-production time with first-token latency removed. Observability span timing. | — | +| **turn metrics** | The durable, replayable per-turn metrics record persisted for a sealed turn: aggregate `Usage` (tokens) + turn `durationMs` + its per-step `StepMetrics`. The persisted counterpart of the live `done` event's metrics; persisted per turn by `conversation-store` (returned in turn-append order), served by `GET /conversations/:id/metrics`. Distinct from observability span timing (trace-store) and from transient live wire events. | usage record, turn stats | +| **step metrics** | The durable per-step metrics within a `TurnMetrics`: the step's `Usage` (tokens) + `ttftMs`/`decodeMs`/`genTotalMs` timing. The persisted counterpart of the live `usage` + `step-complete` events, keyed by `stepId`. | step stats | | **tool call** | A model's request to run a tool within a step. | function call (when meaning a tool call) | | **chunk** | One ordered piece of a message (text, thinking, tool-call/result, etc.), append-only in the log. | block, segment | | **seq** | The monotonic, gap-free, per-conversation sequence number stamped on each chunk as it is appended to the log. The sync cursor: a client requests `?sinceSeq=N` to fetch only newer chunks. Storage/sync metadata, never message content. | cursor (when meaning the number), offset, index | diff --git a/packages/conversation-store/src/keys.ts b/packages/conversation-store/src/keys.ts index 8646a40..0ba25d8 100644 --- a/packages/conversation-store/src/keys.ts +++ b/packages/conversation-store/src/keys.ts @@ -25,3 +25,23 @@ export function parseChunkSeq(key: string): number { const n = Number.parseInt(last, 10); return Number.isNaN(n) ? -1 : n; } + +export function metricsSeqKey(conversationId: string): string { + return `conv:${conversationId}:metrics-seq`; +} + +export function metricsKey(conversationId: string, ordinal: number): string { + return `conv:${conversationId}:metrics:${String(ordinal).padStart(SEQ_PAD, "0")}`; +} + +export function metricsPrefix(conversationId: string): string { + return `conv:${conversationId}:metrics:`; +} + +export function parseMetricsOrdinal(key: string): number { + const parts = key.split(":"); + const last = parts[parts.length - 1]; + if (last === undefined) return -1; + const n = Number.parseInt(last, 10); + return Number.isNaN(n) ? -1 : n; +} diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts index e00fdc2..c7083e8 100644 --- a/packages/conversation-store/src/store.test.ts +++ b/packages/conversation-store/src/store.test.ts @@ -1,4 +1,4 @@ -import type { ChatMessage, StepId, StorageNamespace } from "@dispatch/kernel"; +import type { ChatMessage, StepId, StorageNamespace, TurnMetrics } from "@dispatch/kernel"; import { beforeEach, describe, expect, it } from "vitest"; import { createConversationStore } from "./store.js"; @@ -425,3 +425,141 @@ describe("ConversationStore", () => { } }); }); + +describe("ConversationStore metrics", () => { + let storage: StorageNamespace; + + beforeEach(() => { + storage = createMemoryStorage(); + }); + + it("appendMetrics → loadMetrics round-trips a TurnMetrics (usage + durationMs + steps)", async () => { + const store = createConversationStore(storage); + const stepId = "step_1" as StepId; + const metrics: TurnMetrics = { + turnId: "turn_abc", + usage: { inputTokens: 100, outputTokens: 50 }, + durationMs: 1234, + steps: [ + { + stepId, + usage: { inputTokens: 100, outputTokens: 50 }, + ttftMs: 200, + decodeMs: 800, + genTotalMs: 1000, + }, + ], + }; + await store.appendMetrics("conv1", metrics); + const result = await store.loadMetrics("conv1"); + expect(result).toHaveLength(1); + expect(result[0]).toEqual(metrics); + }); + + it("loadMetrics returns turns in append order", async () => { + const store = createConversationStore(storage); + const metrics1: TurnMetrics = { + turnId: "turn_first", + usage: { inputTokens: 10, outputTokens: 5 }, + steps: [], + }; + const metrics2: TurnMetrics = { + turnId: "turn_second", + usage: { inputTokens: 20, outputTokens: 10 }, + steps: [], + }; + const metrics3: TurnMetrics = { + turnId: "turn_third", + usage: { inputTokens: 30, outputTokens: 15 }, + steps: [], + }; + await store.appendMetrics("conv1", metrics1); + await store.appendMetrics("conv1", metrics2); + await store.appendMetrics("conv1", metrics3); + const result = await store.loadMetrics("conv1"); + expect(result).toHaveLength(3); + expect(result[0]?.turnId).toBe("turn_first"); + expect(result[1]?.turnId).toBe("turn_second"); + expect(result[2]?.turnId).toBe("turn_third"); + }); + + it("loadMetrics returns [] for a conversation with no persisted metrics", async () => { + const store = createConversationStore(storage); + const result = await store.loadMetrics("nonexistent"); + expect(result).toEqual([]); + }); + + it("appendMetrics does not affect chunk load / loadSince", async () => { + const store = createConversationStore(storage); + const msg: ChatMessage = { role: "user", chunks: [{ type: "text", text: "hello" }] }; + await store.append("conv1", [msg]); + + const metrics: TurnMetrics = { + turnId: "turn_iso", + usage: { inputTokens: 100, outputTokens: 50 }, + steps: [], + }; + await store.appendMetrics("conv1", metrics); + + const messages = await store.load("conv1"); + expect(messages).toEqual([msg]); + + const chunks = await store.loadSince("conv1"); + expect(chunks).toHaveLength(1); + expect(chunks[0]?.chunk).toEqual({ type: "text", text: "hello" }); + }); + + it("TurnMetrics with cache tokens + per-step ttft/decode/genTotal round-trips losslessly", async () => { + const store = createConversationStore(storage); + const stepId1 = "step_a" as StepId; + const stepId2 = "step_b" as StepId; + const metrics: TurnMetrics = { + turnId: "turn_cache", + usage: { + inputTokens: 500, + outputTokens: 200, + cacheReadTokens: 300, + cacheWriteTokens: 100, + }, + durationMs: 5000, + steps: [ + { + stepId: stepId1, + usage: { + inputTokens: 300, + outputTokens: 100, + cacheReadTokens: 200, + cacheWriteTokens: 50, + }, + ttftMs: 150, + decodeMs: 600, + genTotalMs: 750, + }, + { + stepId: stepId2, + usage: { + inputTokens: 200, + outputTokens: 100, + cacheReadTokens: 100, + cacheWriteTokens: 50, + }, + ttftMs: 100, + decodeMs: 400, + genTotalMs: 500, + }, + ], + }; + await store.appendMetrics("conv1", metrics); + const result = await store.loadMetrics("conv1"); + expect(result).toHaveLength(1); + expect(result[0]).toEqual(metrics); + expect(result[0]?.usage.cacheReadTokens).toBe(300); + expect(result[0]?.usage.cacheWriteTokens).toBe(100); + expect(result[0]?.steps[0]?.ttftMs).toBe(150); + expect(result[0]?.steps[0]?.decodeMs).toBe(600); + expect(result[0]?.steps[0]?.genTotalMs).toBe(750); + expect(result[0]?.steps[1]?.ttftMs).toBe(100); + expect(result[0]?.steps[1]?.decodeMs).toBe(400); + expect(result[0]?.steps[1]?.genTotalMs).toBe(500); + }); +}); diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts index df8bd4e..080cb79 100644 --- a/packages/conversation-store/src/store.ts +++ b/packages/conversation-store/src/store.ts @@ -1,6 +1,21 @@ -import type { ChatMessage, Chunk, Role, StorageNamespace, StoredChunk } from "@dispatch/kernel"; +import type { + ChatMessage, + Chunk, + Role, + StorageNamespace, + StoredChunk, + TurnMetrics, +} from "@dispatch/kernel"; import { defineService } from "@dispatch/kernel"; -import { chunkKey, chunkPrefix, parseSeq, seqKey } from "./keys.js"; +import { + chunkKey, + chunkPrefix, + metricsKey, + metricsPrefix, + metricsSeqKey, + parseSeq, + seqKey, +} from "./keys.js"; import { reconcile } from "./reconcile.js"; export interface ConversationStore { @@ -10,6 +25,8 @@ export interface ConversationStore { conversationId: string, sinceSeq?: number, ) => Promise<readonly StoredChunk[]>; + readonly appendMetrics: (conversationId: string, metrics: TurnMetrics) => Promise<void>; + readonly loadMetrics: (conversationId: string) => Promise<readonly TurnMetrics[]>; } export const conversationStoreHandle = defineService<ConversationStore>("conversation-store/store"); @@ -100,5 +117,27 @@ export function createConversationStore(storage: StorageNamespace): Conversation return result; }, + + async appendMetrics(conversationId, metrics) { + const raw = await storage.get(metricsSeqKey(conversationId)); + const ordinal = parseSeq(raw) + 1; + await storage.set(metricsKey(conversationId, ordinal), JSON.stringify(metrics)); + await storage.set(metricsSeqKey(conversationId), String(ordinal)); + }, + + async loadMetrics(conversationId) { + const prefix = metricsPrefix(conversationId); + const keys = await storage.keys(prefix); + const sorted = [...keys].sort(); + + const result: TurnMetrics[] = []; + for (const key of sorted) { + const value = await storage.get(key); + if (value === null) continue; + result.push(JSON.parse(value) as TurnMetrics); + } + + return result; + }, }; } diff --git a/packages/kernel/src/contracts/conversation.ts b/packages/kernel/src/contracts/conversation.ts index 0080964..f4a342d 100644 --- a/packages/kernel/src/contracts/conversation.ts +++ b/packages/kernel/src/contracts/conversation.ts @@ -11,6 +11,7 @@ export type { ErrorChunk, Role, StepId, + StepMetrics, StoredChunk, SystemChunk, TextChunk, @@ -18,4 +19,5 @@ export type { ToolCallChunk, ToolResultChunk, TurnId, + TurnMetrics, } from "@dispatch/wire"; diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts index 38f1442..b5802f3 100644 --- a/packages/kernel/src/contracts/index.ts +++ b/packages/kernel/src/contracts/index.ts @@ -18,6 +18,7 @@ export type { ErrorChunk, Role, StepId, + StepMetrics, StoredChunk, SystemChunk, TextChunk, @@ -25,6 +26,7 @@ export type { ToolCallChunk, ToolResultChunk, TurnId, + TurnMetrics, } from "./conversation.js"; export type { ToolDispatchPolicy } from "./dispatch.js"; export type { diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts index ce654d5..6db6645 100644 --- a/packages/kernel/src/runtime/run-turn.test.ts +++ b/packages/kernel/src/runtime/run-turn.test.ts @@ -1307,6 +1307,157 @@ describe("runTurn", () => { // Only one decode span (for the second step) expect(decodeOpens).toHaveLength(1); }); + + it("turn span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(turnClose.attributes?.usage_inputTokens).toBeUndefined(); + expect(turnClose.attributes?.usage_outputTokens).toBeUndefined(); + } + }); + + it("step span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 7, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); + expect(stepClose).toBeDefined(); + if (stepClose?.kind === "span-close") { + expect(stepClose.attributes?.["usage.inputTokens"]).toBe(7); + expect(stepClose.attributes?.["usage.outputTokens"]).toBe(3); + expect(stepClose.attributes?.usage_inputTokens).toBeUndefined(); + expect(stepClose.attributes?.usage_outputTokens).toBeUndefined(); + } + }); + + it("turn + step spans stamp usage.cacheReadTokens / usage.cacheWriteTokens when the provider Usage carries them", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5, cacheReadTokens: 3, cacheWriteTokens: 2 }, + }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); + const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); + + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBe(3); + expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBe(2); + } + + expect(stepClose).toBeDefined(); + if (stepClose?.kind === "span-close") { + expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBe(3); + expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBe(2); + } + }); + + it("turn + step spans OMIT the cache-token attrs when the provider Usage lacks them", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "hi" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const { logger, sink } = createTestLogger(); + + await runTurn({ + provider, + messages: [userMessage], + tools: [], + dispatch: { maxConcurrent: 1, eager: false }, + conversationId: "conv-1", + turnId: "turn-1", + emit: () => {}, + logger, + }); + + const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn"); + const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step"); + + expect(turnClose).toBeDefined(); + if (turnClose?.kind === "span-close") { + expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined(); + expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined(); + } + + expect(stepClose).toBeDefined(); + if (stepClose?.kind === "span-close") { + expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10); + expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5); + expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined(); + expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined(); + } + }); }); describe("provider logger threading", () => { diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts index 06069a2..d5db1bf 100644 --- a/packages/kernel/src/runtime/run-turn.ts +++ b/packages/kernel/src/runtime/run-turn.ts @@ -50,6 +50,20 @@ function addUsage(a: Usage, b: Usage): Usage { return { inputTokens, outputTokens }; } +function usageAttrs(usage: Usage): Record<string, string | number | boolean | null> { + const attrs: Record<string, string | number | boolean | null> = { + "usage.inputTokens": usage.inputTokens, + "usage.outputTokens": usage.outputTokens, + }; + if (usage.cacheReadTokens !== undefined) { + attrs["usage.cacheReadTokens"] = usage.cacheReadTokens; + } + if (usage.cacheWriteTokens !== undefined) { + attrs["usage.cacheWriteTokens"] = usage.cacheWriteTokens; + } + return attrs; +} + function appendTextDelta(chunks: Chunk[], delta: string): void { const lastIdx = chunks.length - 1; const last = chunks[lastIdx]; @@ -409,8 +423,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> { stepSpan.end({ attrs: { finishReason, - usage_inputTokens: stepUsage.inputTokens, - usage_outputTokens: stepUsage.outputTokens, + ...usageAttrs(stepUsage), }, }); } catch { @@ -533,8 +546,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> { turnSpan.end({ attrs: { finishReason, - usage_inputTokens: totalUsage.inputTokens, - usage_outputTokens: totalUsage.outputTokens, + ...usageAttrs(totalUsage), }, }); } catch { diff --git a/packages/session-orchestrator/src/metrics.test.ts b/packages/session-orchestrator/src/metrics.test.ts new file mode 100644 index 0000000..c123dba --- /dev/null +++ b/packages/session-orchestrator/src/metrics.test.ts @@ -0,0 +1,264 @@ +import type { AgentEvent, StepId } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import { createMetricsAccumulator } from "./metrics.js"; + +function stepId(id: string): StepId { + return id as StepId; +} + +describe("createMetricsAccumulator", () => { + it("builds a TurnMetrics from a single-step turn", () => { + const acc = createMetricsAccumulator(); + + const usageEvent: AgentEvent = { + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + usage: { inputTokens: 10, outputTokens: 5 }, + }; + const stepCompleteEvent: AgentEvent = { + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + ttftMs: 50, + decodeMs: 150, + genTotalMs: 200, + }; + const doneEvent: AgentEvent = { + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + durationMs: 500, + usage: { inputTokens: 10, outputTokens: 5 }, + }; + + acc.ingest(usageEvent); + acc.ingest(stepCompleteEvent); + acc.ingest(doneEvent); + + const tm = acc.build("t1"); + expect(tm.turnId).toBe("t1"); + expect(tm.usage.inputTokens).toBe(10); + expect(tm.usage.outputTokens).toBe(5); + expect(tm.durationMs).toBe(500); + expect(tm.steps).toHaveLength(1); + expect(tm.steps[0]?.stepId).toBe(stepId("t1#0")); + expect(tm.steps[0]?.usage.inputTokens).toBe(10); + expect(tm.steps[0]?.usage.outputTokens).toBe(5); + expect(tm.steps[0]?.ttftMs).toBe(50); + expect(tm.steps[0]?.decodeMs).toBe(150); + expect(tm.steps[0]?.genTotalMs).toBe(200); + }); + + it("aggregates multi-step usage and preserves step order", () => { + const acc = createMetricsAccumulator(); + + acc.ingest({ + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + usage: { inputTokens: 10, outputTokens: 5 }, + }); + acc.ingest({ + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + genTotalMs: 100, + }); + acc.ingest({ + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#1"), + usage: { inputTokens: 20, outputTokens: 10 }, + }); + acc.ingest({ + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#1"), + genTotalMs: 150, + }); + acc.ingest({ + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + usage: { inputTokens: 30, outputTokens: 15 }, + }); + + const tm = acc.build("t1"); + expect(tm.steps).toHaveLength(2); + expect(tm.steps[0]?.stepId).toBe(stepId("t1#0")); + expect(tm.steps[0]?.usage.inputTokens).toBe(10); + expect(tm.steps[1]?.stepId).toBe(stepId("t1#1")); + expect(tm.steps[1]?.usage.inputTokens).toBe(20); + expect(tm.usage.inputTokens).toBe(30); + expect(tm.usage.outputTokens).toBe(15); + }); + + it("joins per-step timing and usage by stepId", () => { + const acc = createMetricsAccumulator(); + + acc.ingest({ + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + ttftMs: 50, + decodeMs: 100, + genTotalMs: 150, + }); + acc.ingest({ + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + usage: { inputTokens: 10, outputTokens: 5 }, + }); + + const tm = acc.build("t1"); + expect(tm.steps).toHaveLength(1); + expect(tm.steps[0]?.usage.inputTokens).toBe(10); + expect(tm.steps[0]?.ttftMs).toBe(50); + expect(tm.steps[0]?.decodeMs).toBe(100); + expect(tm.steps[0]?.genTotalMs).toBe(150); + }); + + it("turn-level usage comes from done event, not step sum", () => { + const acc = createMetricsAccumulator(); + + acc.ingest({ + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + usage: { inputTokens: 10, outputTokens: 5 }, + }); + acc.ingest({ + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + }); + acc.ingest({ + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + usage: { inputTokens: 100, outputTokens: 50 }, + }); + + const tm = acc.build("t1"); + expect(tm.usage.inputTokens).toBe(100); + expect(tm.usage.outputTokens).toBe(50); + }); + + it("falls back to summing step usage when done.usage is absent", () => { + const acc = createMetricsAccumulator(); + + acc.ingest({ + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + usage: { inputTokens: 10, outputTokens: 5 }, + }); + acc.ingest({ + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + }); + acc.ingest({ + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + }); + + const tm = acc.build("t1"); + expect(tm.usage.inputTokens).toBe(10); + expect(tm.usage.outputTokens).toBe(5); + }); + + it("tolerates missing usage on a step (timing only)", () => { + const acc = createMetricsAccumulator(); + + acc.ingest({ + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + genTotalMs: 200, + }); + acc.ingest({ + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + }); + + const tm = acc.build("t1"); + expect(tm.steps).toHaveLength(1); + expect(tm.steps[0]?.usage.inputTokens).toBe(0); + expect(tm.steps[0]?.usage.outputTokens).toBe(0); + expect(tm.steps[0]?.genTotalMs).toBe(200); + }); + + it("tolerates missing timing on a step (usage only)", () => { + const acc = createMetricsAccumulator(); + + acc.ingest({ + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + usage: { inputTokens: 10, outputTokens: 5 }, + }); + acc.ingest({ + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + }); + + const tm = acc.build("t1"); + expect(tm.steps).toHaveLength(1); + expect(tm.steps[0]?.usage.inputTokens).toBe(10); + expect(tm.steps[0]?.ttftMs).toBeUndefined(); + expect(tm.steps[0]?.decodeMs).toBeUndefined(); + expect(tm.steps[0]?.genTotalMs).toBeUndefined(); + }); + + it("reset clears all accumulated state", () => { + const acc = createMetricsAccumulator(); + + acc.ingest({ + type: "usage", + conversationId: "c1", + turnId: "t1", + stepId: stepId("t1#0"), + usage: { inputTokens: 10, outputTokens: 5 }, + }); + acc.ingest({ + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + usage: { inputTokens: 10, outputTokens: 5 }, + }); + + acc.reset(); + + const tm = acc.build("t2"); + expect(tm.steps).toHaveLength(0); + expect(tm.usage.inputTokens).toBe(0); + expect(tm.usage.outputTokens).toBe(0); + }); +}); diff --git a/packages/session-orchestrator/src/metrics.ts b/packages/session-orchestrator/src/metrics.ts new file mode 100644 index 0000000..e953bd9 --- /dev/null +++ b/packages/session-orchestrator/src/metrics.ts @@ -0,0 +1,124 @@ +import type { + AgentEvent, + StepId, + StepMetrics, + TurnDoneEvent, + TurnMetrics, + TurnStepCompleteEvent, + TurnUsageEvent, + Usage, +} from "@dispatch/kernel"; + +const zeroUsage: Usage = { inputTokens: 0, outputTokens: 0 }; + +interface StepAccumulator { + readonly stepId: StepId; + usage: Usage | undefined; + ttftMs: number | undefined; + decodeMs: number | undefined; + genTotalMs: number | undefined; +} + +export interface MetricsAccumulator { + readonly ingest: (event: AgentEvent) => void; + readonly build: (turnId: string) => TurnMetrics; + readonly reset: () => void; +} + +export function createMetricsAccumulator(): MetricsAccumulator { + const steps = new Map<StepId, StepAccumulator>(); + const stepOrder: StepId[] = []; + let doneUsage: Usage | undefined; + let doneDurationMs: number | undefined; + + function getOrCreateStep(stepId: StepId): StepAccumulator { + let acc = steps.get(stepId); + if (acc === undefined) { + acc = { + stepId, + usage: undefined, + ttftMs: undefined, + decodeMs: undefined, + genTotalMs: undefined, + }; + steps.set(stepId, acc); + stepOrder.push(stepId); + } + return acc; + } + + function ingest(event: AgentEvent): void { + switch (event.type) { + case "usage": { + const e = event as TurnUsageEvent; + if (e.stepId !== undefined) { + const acc = getOrCreateStep(e.stepId); + acc.usage = e.usage; + } + break; + } + case "step-complete": { + const e = event as TurnStepCompleteEvent; + const acc = getOrCreateStep(e.stepId); + acc.ttftMs = e.ttftMs; + acc.decodeMs = e.decodeMs; + acc.genTotalMs = e.genTotalMs; + break; + } + case "done": { + const e = event as TurnDoneEvent; + doneUsage = e.usage; + doneDurationMs = e.durationMs; + break; + } + } + } + + function build(turnId: string): TurnMetrics { + const stepMetrics: StepMetrics[] = stepOrder.map((stepId) => { + const acc = steps.get(stepId); + if (acc === undefined) { + return { stepId, usage: zeroUsage }; + } + const usage = acc.usage ?? zeroUsage; + const sm: StepMetrics = { stepId, usage }; + if (acc.ttftMs !== undefined) { + (sm as { ttftMs?: number }).ttftMs = acc.ttftMs; + } + if (acc.decodeMs !== undefined) { + (sm as { decodeMs?: number }).decodeMs = acc.decodeMs; + } + if (acc.genTotalMs !== undefined) { + (sm as { genTotalMs?: number }).genTotalMs = acc.genTotalMs; + } + return sm; + }); + + const aggregateUsage = doneUsage ?? sumStepUsage(stepMetrics); + + const tm: TurnMetrics = { turnId, usage: aggregateUsage, steps: stepMetrics }; + if (doneDurationMs !== undefined) { + (tm as { durationMs?: number }).durationMs = doneDurationMs; + } + return tm; + } + + function reset(): void { + steps.clear(); + stepOrder.length = 0; + doneUsage = undefined; + doneDurationMs = undefined; + } + + return { ingest, build, reset }; +} + +function sumStepUsage(steps: readonly StepMetrics[]): Usage { + let inputTokens = 0; + let outputTokens = 0; + for (const s of steps) { + inputTokens += s.usage.inputTokens; + outputTokens += s.usage.outputTokens; + } + return { inputTokens, outputTokens }; +} diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts index 3954ffe..ea564c5 100644 --- a/packages/session-orchestrator/src/orchestrator.test.ts +++ b/packages/session-orchestrator/src/orchestrator.test.ts @@ -7,6 +7,8 @@ import type { RunTurnInput, RunTurnResult, StoredChunk, + ToolContract, + TurnMetrics, } from "@dispatch/kernel"; import { runTurn } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; @@ -14,10 +16,13 @@ import { createSessionOrchestrator } from "./orchestrator.js"; function createInMemoryStore(): ConversationStore & { readonly data: Map<string, ChatMessage[]>; + readonly metricsData: Map<string, TurnMetrics[]>; } { const data = new Map<string, ChatMessage[]>(); + const metricsData = new Map<string, TurnMetrics[]>(); return { data, + metricsData, async append(conversationId, messages) { const existing = data.get(conversationId) ?? []; data.set(conversationId, [...existing, ...messages]); @@ -39,6 +44,13 @@ function createInMemoryStore(): ConversationStore & { } return result; }, + async appendMetrics(conversationId, metrics) { + const existing = metricsData.get(conversationId) ?? []; + metricsData.set(conversationId, [...existing, metrics]); + }, + async loadMetrics(conversationId) { + return [...(metricsData.get(conversationId) ?? [])]; + }, }; } @@ -63,6 +75,18 @@ function collectEvents(): { events: AgentEvent[]; onEvent: (event: AgentEvent) = return { events, onEvent: (event) => events.push(event) }; } +function createFakeTool( + name: string, + handler: (input: unknown) => Promise<{ content: string }>, +): ToolContract { + return { + name, + description: `Fake tool: ${name}`, + parameters: { type: "object" }, + execute: async (input) => handler(input), + }; +} + describe("handleMessage integration", () => { it("loads history, runs turn, emits events, and persists result", async () => { const store = createInMemoryStore(); @@ -465,6 +489,13 @@ describe("turn-sealed event", () => { async loadSince(conversationId, sinceSeq) { return store.loadSince(conversationId, sinceSeq); }, + async appendMetrics(conversationId, metrics) { + await store.appendMetrics(conversationId, metrics); + ordering.push("appendMetrics"); + }, + async loadMetrics(conversationId) { + return store.loadMetrics(conversationId); + }, }; const orchestrator = createSessionOrchestrator({ @@ -484,7 +515,7 @@ describe("turn-sealed event", () => { }, }); - expect(ordering).toEqual(["append", "turn-sealed"]); + expect(ordering).toEqual(["append", "appendMetrics", "turn-sealed"]); }); it("does not emit turn-sealed when append throws", async () => { @@ -505,6 +536,12 @@ describe("turn-sealed event", () => { async loadSince() { return []; }, + async appendMetrics() { + return undefined; + }, + async loadMetrics() { + return []; + }, }; const orchestrator = createSessionOrchestrator({ @@ -528,3 +565,283 @@ describe("turn-sealed event", () => { expect(sealedEvents).toHaveLength(0); }); }); + +describe("turn metrics persistence", () => { + it("persists a TurnMetrics after a single-step turn seals", async () => { + const store = createInMemoryStore(); + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "Hello" }, + { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn, + now: () => 1000, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-metrics-1", + text: "test", + onEvent: () => {}, + }); + + const metrics = store.metricsData.get("conv-metrics-1"); + expect(metrics).toBeDefined(); + expect(metrics).toHaveLength(1); + expect(metrics?.[0]?.turnId).toMatch(/^turn-/); + expect(metrics?.[0]?.usage.inputTokens).toBe(10); + expect(metrics?.[0]?.usage.outputTokens).toBe(5); + expect(metrics?.[0]?.steps).toHaveLength(1); + expect(metrics?.[0]?.steps[0]?.usage.inputTokens).toBe(10); + expect(metrics?.[0]?.steps[0]?.usage.outputTokens).toBe(5); + }); + + it("TurnMetrics aggregates multi-step usage and carries each step's StepMetrics in order", async () => { + const store = createInMemoryStore(); + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = callIndex++; + return (async function* () { + if (idx === 0) { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + yield { type: "text-delta", delta: "Step2" } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 20, outputTokens: 10 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [tool], + runTurn, + now: () => 1000, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-metrics-multi", + text: "test", + onEvent: () => {}, + }); + + const metrics = store.metricsData.get("conv-metrics-multi"); + expect(metrics).toBeDefined(); + expect(metrics).toHaveLength(1); + + const tm = metrics?.[0]; + if (tm === undefined) throw new Error("expected metrics"); + + expect(tm.steps.length).toBeGreaterThanOrEqual(2); + + expect(tm.steps[0]?.usage.inputTokens).toBe(10); + expect(tm.steps[0]?.usage.outputTokens).toBe(5); + expect(tm.steps[1]?.usage.inputTokens).toBe(20); + expect(tm.steps[1]?.usage.outputTokens).toBe(10); + + expect(tm.usage.inputTokens).toBe(30); + expect(tm.usage.outputTokens).toBe(15); + }); + + it("per-step timing and usage are joined by stepId into one StepMetrics", async () => { + const store = createInMemoryStore(); + const clock = createCounterNow(); + clock.tick(100); + + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = callIndex++; + return (async function* () { + if (idx === 0) { + clock.tick(50); + yield { type: "text-delta", delta: "Hello" } as ProviderEvent; + clock.tick(100); + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + clock.tick(50); + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn, + now: clock.now, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-metrics-join", + text: "test", + onEvent: () => {}, + }); + + const metrics = store.metricsData.get("conv-metrics-join"); + expect(metrics).toBeDefined(); + expect(metrics).toHaveLength(1); + + const tm = metrics?.[0]; + if (tm === undefined) throw new Error("expected metrics"); + + expect(tm.steps).toHaveLength(1); + const step = tm.steps[0]; + if (step === undefined) throw new Error("expected step"); + + expect(step.usage.inputTokens).toBe(10); + expect(step.usage.outputTokens).toBe(5); + expect(step.genTotalMs).toBe(200); + expect(step.ttftMs).toBe(50); + expect(step.decodeMs).toBe(150); + }); + + it("turn-level usage comes from the done event aggregate", async () => { + const store = createInMemoryStore(); + const tool = createFakeTool("echo", async () => ({ content: "echoed" })); + + let callIndex = 0; + const provider: ProviderContract = { + id: "fake", + stream() { + const idx = callIndex++; + return (async function* () { + if (idx === 0) { + yield { + type: "tool-call", + toolCallId: "tc1", + toolName: "echo", + input: {}, + } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 10, outputTokens: 5 }, + } as ProviderEvent; + yield { type: "finish", reason: "tool-calls" } as ProviderEvent; + } else { + yield { type: "text-delta", delta: "Step2" } as ProviderEvent; + yield { + type: "usage", + usage: { inputTokens: 20, outputTokens: 10 }, + } as ProviderEvent; + yield { type: "finish", reason: "stop" } as ProviderEvent; + } + })(); + }, + }; + + const orchestrator = createSessionOrchestrator({ + conversationStore: store, + resolveProvider: () => provider, + resolveTools: () => [tool], + runTurn, + now: () => 1000, + }); + + await orchestrator.handleMessage({ + conversationId: "conv-metrics-done", + text: "test", + onEvent: () => {}, + }); + + const metrics = store.metricsData.get("conv-metrics-done"); + expect(metrics).toBeDefined(); + expect(metrics).toHaveLength(1); + + const tm = metrics?.[0]; + if (tm === undefined) throw new Error("expected metrics"); + + expect(tm.usage.inputTokens).toBe(30); + expect(tm.usage.outputTokens).toBe(15); + }); + + it("does not persist metrics nor emit turn-sealed when chunk append fails", async () => { + const provider = createFakeProvider([ + [ + { type: "text-delta", delta: "ok" }, + { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } }, + { type: "finish", reason: "stop" }, + ], + ]); + + let metricsAppended = false; + const failingMetricsStore: ConversationStore = { + async append() { + throw new Error("storage failure"); + }, + async load() { + return []; + }, + async loadSince() { + return []; + }, + async appendMetrics() { + metricsAppended = true; + }, + async loadMetrics() { + return []; + }, + }; + + const orchestrator = createSessionOrchestrator({ + conversationStore: failingMetricsStore, + resolveProvider: () => provider, + resolveTools: () => [], + runTurn, + }); + + const { events, onEvent } = collectEvents(); + + await expect( + orchestrator.handleMessage({ + conversationId: "conv-fail-metrics", + text: "test", + onEvent, + }), + ).rejects.toThrow("storage failure"); + + const sealedEvents = events.filter((e) => e.type === "turn-sealed"); + expect(sealedEvents).toHaveLength(0); + expect(metricsAppended).toBe(false); + }); +}); + +function createCounterNow(): { now: () => number; tick: (ms: number) => void } { + let t = 0; + return { + now: () => t, + tick(ms: number) { + t += ms; + }, + }; +} diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts index 04f6ad2..d84b805 100644 --- a/packages/session-orchestrator/src/orchestrator.ts +++ b/packages/session-orchestrator/src/orchestrator.ts @@ -11,6 +11,7 @@ import type { ToolDispatchPolicy, } from "@dispatch/kernel"; import { defineService } from "@dispatch/kernel"; +import { createMetricsAccumulator } from "./metrics.js"; import { buildUserMessage, defaultDispatchPolicy, generateTurnId } from "./pure.js"; export interface SessionOrchestrator { @@ -73,13 +74,19 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio const tools = deps.resolveTools(); const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy(); const turnLogger = deps.logger?.child({ conversationId, turnId }); + const metrics = createMetricsAccumulator(); + + const emitAndAccumulate = (event: AgentEvent): void => { + metrics.ingest(event); + onEvent(event); + }; const opts: RunTurnInput = { provider, messages: [...history, userMsg], tools, dispatch, - emit: onEvent, + emit: emitAndAccumulate, conversationId, turnId, ...(modelOverride !== undefined @@ -96,6 +103,9 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio const toPersist: ChatMessage[] = [userMsg, ...result.messages]; await deps.conversationStore.append(conversationId, toPersist); + const turnMetrics = metrics.build(turnId); + await deps.conversationStore.appendMetrics(conversationId, turnMetrics); + onEvent({ type: "turn-sealed", conversationId, turnId }); }, }; diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json index a2711af..9b6e9b3 100644 --- a/packages/transport-contract/package.json +++ b/packages/transport-contract/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/transport-contract", - "version": "0.3.0", + "version": "0.4.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index cdddad6..bc01ca6 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -20,9 +20,9 @@ */ import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; -import type { AgentEvent, StoredChunk } from "@dispatch/wire"; +import type { AgentEvent, StoredChunk, TurnMetrics } from "@dispatch/wire"; -export type { AgentEvent, StoredChunk } from "@dispatch/wire"; +export type { AgentEvent, StepMetrics, StoredChunk, TurnMetrics } from "@dispatch/wire"; /** * Request body for `POST /chat` (sent as JSON). @@ -92,6 +92,25 @@ export interface ConversationHistoryResponse { readonly latestSeq: number; } +/** + * Response body for `GET /conversations/:id/metrics` — the persisted per-turn + * (and per-step) token + timing metrics for a conversation, for a client + * reopening a past conversation to render historical usage/latency. + * + * This is a SEPARATE axis from the two other read concerns and is deliberately + * its own endpoint: the live `usage`/`step-complete`/`done` events are transient + * (not persisted), and `ConversationHistoryResponse` carries seq-cursor chunk + * CONTENT. Metrics are keyed per TURN (not per chunk) and so are not seq-filtered + * — hence a sibling route rather than a field on the history response. + * + * `turns` is every SEALED turn's `TurnMetrics` in turn order. A turn appears only + * after its metrics were persisted (post-seal); an in-flight or unsealed turn is + * absent until then. + */ +export interface ConversationMetricsResponse { + readonly turns: readonly TurnMetrics[]; +} + // ─── WebSocket chat ops ─────────────────────────────────────────────────────── // The persistent WS connection multiplexes chat ops (below) with surface ops // (`@dispatch/ui-contract`). The unified unions at the bottom compose both. Chat diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index aa47dce..0a6c5b0 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -1,4 +1,4 @@ -import type { AgentEvent, Logger, StoredChunk } from "@dispatch/kernel"; +import type { AgentEvent, Logger, StepId, StoredChunk, TurnMetrics } from "@dispatch/kernel"; import { describe, expect, it } from "vitest"; import { createApp } from "./app.js"; import type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js"; @@ -47,6 +47,7 @@ function createFakeLogger(): Logger & { readonly records: readonly CapturedLog[] function createFakeConversationStore( store: Map<string, StoredChunk[]> = new Map(), + metricsStore: Map<string, TurnMetrics[]> = new Map(), ): ConversationStore { return { async append() {}, @@ -58,6 +59,10 @@ function createFakeConversationStore( const minSeq = sinceSeq ?? 0; return chunks.filter((c) => c.seq > minSeq); }, + async appendMetrics() {}, + async loadMetrics(conversationId) { + return metricsStore.get(conversationId) ?? []; + }, }; } @@ -465,6 +470,127 @@ describe("GET /conversations/:id", () => { }); }); +describe("GET /conversations/:id/metrics", () => { + const sampleMetrics: TurnMetrics[] = [ + { + turnId: "turn1", + usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 0, cacheWriteTokens: 0 }, + durationMs: 1000, + steps: [ + { + stepId: "step1" as StepId, + usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 0, cacheWriteTokens: 0 }, + ttftMs: 200, + decodeMs: 300, + genTotalMs: 500, + }, + ], + }, + { + turnId: "turn2", + usage: { inputTokens: 200, outputTokens: 80, cacheReadTokens: 10, cacheWriteTokens: 5 }, + durationMs: 1500, + steps: [ + { + stepId: "step2" as StepId, + usage: { inputTokens: 200, outputTokens: 80, cacheReadTokens: 10, cacheWriteTokens: 5 }, + ttftMs: 300, + decodeMs: 500, + genTotalMs: 800, + }, + ], + }, + ]; + + it("returns persisted turn metrics as { turns }", async () => { + const metricsStore = new Map<string, TurnMetrics[]>([["conv1", sampleMetrics]]); + const app = createApp({ + conversationStore: createFakeConversationStore(new Map(), metricsStore), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + + const res = await app.request("/conversations/conv1/metrics"); + expect(res.status).toBe(200); + const body = (await res.json()) as { turns: readonly TurnMetrics[] }; + expect(body.turns).toHaveLength(2); + expect(body.turns[0]?.turnId).toBe("turn1"); + expect(body.turns[1]?.turnId).toBe("turn2"); + }); + + it("returns { turns: [] } for an unknown conversation", async () => { + const app = createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + + const res = await app.request("/conversations/unknown/metrics"); + expect(res.status).toBe(200); + const body = (await res.json()) as { turns: readonly TurnMetrics[] }; + expect(body.turns).toHaveLength(0); + }); + + it("the metrics route does not collide with GET /conversations/:id history route", async () => { + const sampleChunks: StoredChunk[] = [ + { seq: 1, role: "user", chunk: { type: "text", text: "hello" } }, + ]; + const store = new Map<string, StoredChunk[]>([["conv1", sampleChunks]]); + const metricsStore = new Map<string, TurnMetrics[]>([["conv1", sampleMetrics]]); + const app = createApp({ + conversationStore: createFakeConversationStore(store, metricsStore), + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + }); + + const metricsRes = await app.request("/conversations/conv1/metrics"); + expect(metricsRes.status).toBe(200); + const metricsBody = (await metricsRes.json()) as { turns: readonly TurnMetrics[] }; + expect(metricsBody.turns).toHaveLength(2); + + const historyRes = await app.request("/conversations/conv1"); + expect(historyRes.status).toBe(200); + const historyBody = (await historyRes.json()) as { + chunks: readonly StoredChunk[]; + latestSeq: number; + }; + expect(historyBody.chunks).toHaveLength(1); + }); + + it("a store failure on the metrics read returns an error status + logs an error", async () => { + const logger = createFakeLogger(); + const brokenStore: ConversationStore = { + async append() {}, + async load() { + return []; + }, + async loadSince() { + return []; + }, + async appendMetrics() {}, + async loadMetrics() { + throw new Error("storage exploded"); + }, + }; + const app = createApp({ + conversationStore: brokenStore, + orchestrator: createFakeOrchestrator([]), + credentialStore: createFakeCredentialStore([]), + logger, + }); + + const res = await app.request("/conversations/conv1/metrics"); + expect(res.status).toBe(500); + const body = (await res.json()) as { error: string }; + expect(body.error).toContain("Failed to load conversation metrics"); + + const errorLogs = logger.records.filter((r) => r.level === "error"); + expect(errorLogs).toHaveLength(1); + expect(errorLogs[0]?.msg).toBe("conversations: metrics store failure"); + expect(errorLogs[0]?.attrs?.err).toBeInstanceOf(Error); + }); +}); + describe("POST /chat logging", () => { it("POST /chat logs an info line when a request is accepted", async () => { const logger = createFakeLogger(); diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index a8edc71..4002e23 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -1,5 +1,9 @@ import type { AgentEvent, Logger } from "@dispatch/kernel"; -import type { ConversationHistoryResponse, ModelsResponse } from "@dispatch/transport-contract"; +import type { + ConversationHistoryResponse, + ConversationMetricsResponse, + ModelsResponse, +} from "@dispatch/transport-contract"; import { Hono } from "hono"; import { cors } from "hono/cors"; import { @@ -57,6 +61,23 @@ export function createApp(opts: CreateServerOptions): Hono { app.get("/health", (c) => c.json({ ok: true })); + app.get("/conversations/:id/metrics", async (c) => { + const conversationId = c.req.param("id"); + + try { + const turns = await opts.conversationStore.loadMetrics(conversationId); + log.info("conversations: metrics read", { + conversationId, + count: turns.length, + }); + const body: ConversationMetricsResponse = { turns }; + return c.json(body, 200); + } catch (err) { + log.error("conversations: metrics store failure", { err }); + return c.json({ error: "Failed to load conversation metrics" }, 500); + } + }); + app.get("/conversations/:id", async (c) => { const conversationId = c.req.param("id"); const sinceSeqResult = parseSinceSeq(c.req.query("sinceSeq")); diff --git a/packages/transport-http/src/server.bun.test.ts b/packages/transport-http/src/server.bun.test.ts index e824d18..b2a978a 100644 --- a/packages/transport-http/src/server.bun.test.ts +++ b/packages/transport-http/src/server.bun.test.ts @@ -37,6 +37,10 @@ function fakeConversationStore(): ConversationStore { async loadSince() { return []; }, + async appendMetrics() {}, + async loadMetrics() { + return []; + }, }; } diff --git a/packages/wire/package.json b/packages/wire/package.json index 762c06e..790c7e1 100644 --- a/packages/wire/package.json +++ b/packages/wire/package.json @@ -1,6 +1,6 @@ { "name": "@dispatch/wire", - "version": "0.3.0", + "version": "0.4.0", "type": "module", "private": true, "main": "dist/index.js", diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts index a4790de..aa6f9d0 100644 --- a/packages/wire/src/index.ts +++ b/packages/wire/src/index.ts @@ -153,6 +153,47 @@ export interface Usage { readonly cacheWriteTokens?: number; } +// ─── Persisted metrics ─────────────────────────────────────────────────────── + +/** + * Durable per-step metrics for a completed step — the persisted, replayable + * counterpart of the live `usage` + `step-complete` events. Combines the step's + * token usage with its generation timing so a client reopening a past + * conversation renders the same per-step token/latency breakdown it would have + * seen live. Built from the turn's events, stored by `conversation-store`, and + * served by `GET /conversations/:id/metrics`. + */ +export interface StepMetrics { + readonly stepId: StepId; + /** The step's token usage (all four counters; cache fields optional per `Usage`). */ + readonly usage: Usage; + /** Time to first token (stream start → first text/reasoning delta). Optional — see `TurnStepCompleteEvent.ttftMs`. */ + readonly ttftMs?: number; + /** Decode time (first token → stream end). Optional — see `TurnStepCompleteEvent.decodeMs`. */ + readonly decodeMs?: number; + /** Total generation time for the step (stream start → stream end). Optional: present only when a clock was available. */ + readonly genTotalMs?: number; +} + +/** + * Durable per-turn metrics for a completed (sealed) turn — the persisted, + * replayable counterpart of the live `done` event's aggregate `usage` + + * `durationMs`, plus the per-step breakdown. `usage` is the aggregate across all + * steps; `steps` carries each step's `StepMetrics` in step order. Persisted per + * turn by `conversation-store` (returned in turn-append order) and served by + * `GET /conversations/:id/metrics`. (`turnId` is the plain wire string carried + * on every `AgentEvent`, the join key to the live stream.) + */ +export interface TurnMetrics { + readonly turnId: string; + /** Aggregate token usage across all steps in the turn. */ + readonly usage: Usage; + /** Total wall-clock duration of the turn (turn start → turn end). Optional: present only when a clock was available. */ + readonly durationMs?: number; + /** Per-step metrics in step order. */ + readonly steps: readonly StepMetrics[]; +} + // ─── Outward events ───────────────────────────────────────────────────────── /** |
