summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-10 08:29:59 +0900
committerAdam Malczewski <[email protected]>2026-06-10 08:29:59 +0900
commit6db12ff70acb3333d05a5020ab66da4172a5225a (patch)
treede5cc6314a3a6dd966d7c4fdb9b20adb04ae8307
parent4248cd1d546a4c1fb4e68940c11b5e309c2c2736 (diff)
downloaddispatch-6db12ff70acb3333d05a5020ab66da4172a5225a.tar.gz
dispatch-6db12ff70acb3333d05a5020ab66da4172a5225a.zip
feat(metrics): durable per-turn/step token+timing metrics (observability spans + persisted replay)
Two-part token-data improvement: #2 Observability spans (kernel run-turn): turn & step span-close now stamp ALL four Usage fields — added usage.cacheReadTokens/cacheWriteTokens (were silently dropped) and normalized usage_* -> usage.* to match the provider.request span (consistent D9 GROUP BY). No contract change. #3 Persisted replay metrics (conversation-store + read endpoint): new StepMetrics/TurnMetrics wire types; conversation-store persists per-turn metrics in a separate key space (appendMetrics/loadMetrics, turn-append order); session-orchestrator accumulates per-step+turn metrics from the event stream (pure metrics.ts) and persists after seal; transport-http serves GET /conversations/:id/metrics -> ConversationMetricsResponse. Contracts: @dispatch/wire + @dispatch/transport-contract bumped 0.3.0->0.4.0 (additive). GLOSSARY: turn metrics / step metrics. typecheck EXIT 0, biome clean, 546 vitest + 89 bun = 635 tests.
-rw-r--r--GLOSSARY.md2
-rw-r--r--packages/conversation-store/src/keys.ts20
-rw-r--r--packages/conversation-store/src/store.test.ts140
-rw-r--r--packages/conversation-store/src/store.ts43
-rw-r--r--packages/kernel/src/contracts/conversation.ts2
-rw-r--r--packages/kernel/src/contracts/index.ts2
-rw-r--r--packages/kernel/src/runtime/run-turn.test.ts151
-rw-r--r--packages/kernel/src/runtime/run-turn.ts20
-rw-r--r--packages/session-orchestrator/src/metrics.test.ts264
-rw-r--r--packages/session-orchestrator/src/metrics.ts124
-rw-r--r--packages/session-orchestrator/src/orchestrator.test.ts319
-rw-r--r--packages/session-orchestrator/src/orchestrator.ts12
-rw-r--r--packages/transport-contract/package.json2
-rw-r--r--packages/transport-contract/src/index.ts23
-rw-r--r--packages/transport-http/src/app.test.ts128
-rw-r--r--packages/transport-http/src/app.ts23
-rw-r--r--packages/transport-http/src/server.bun.test.ts4
-rw-r--r--packages/wire/package.json2
-rw-r--r--packages/wire/src/index.ts41
19 files changed, 1307 insertions, 15 deletions
diff --git a/GLOSSARY.md b/GLOSSARY.md
index 65aa6f2..845b45f 100644
--- a/GLOSSARY.md
+++ b/GLOSSARY.md
@@ -20,6 +20,8 @@
| **stepId** | The identifier of a step, stamped on each `tool-call`/`tool-result` event and tool chunk it produces, so a client groups a parallel/batched tool-call set by equality. Branded `StepId`; the runtime derives it deterministically as `<turnId>#<stepIndex>` (0-based). Generation provenance carried ON the tool chunk (unlike `seq`, which is a store-assigned sync cursor on the `StoredChunk` envelope). Treat as opaque. | batchId, step index (as the wire key) |
| **TTFT** (time to first token) | Per-step latency from the generation stream starting to the first content token (text **or** reasoning) arriving. Inherently per LLM round-trip — each step re-prefills, so a turn has one TTFT per step; step 0's is the turn's user-visible first-token latency, and the sum across steps is the total prefill overhead. Captured as observability span timing only (not on the wire). | time-to-first-byte (when meaning tokens) |
| **decode time** | The generation time of a step *after* the first token — first token → stream end, i.e. the step's generation total minus its TTFT. The model's token-production time with first-token latency removed. Observability span timing. | — |
+| **turn metrics** | The durable, replayable per-turn metrics record persisted for a sealed turn: aggregate `Usage` (tokens) + turn `durationMs` + its per-step `StepMetrics`. The persisted counterpart of the live `done` event's metrics; persisted per turn by `conversation-store` (returned in turn-append order), served by `GET /conversations/:id/metrics`. Distinct from observability span timing (trace-store) and from transient live wire events. | usage record, turn stats |
+| **step metrics** | The durable per-step metrics within a `TurnMetrics`: the step's `Usage` (tokens) + `ttftMs`/`decodeMs`/`genTotalMs` timing. The persisted counterpart of the live `usage` + `step-complete` events, keyed by `stepId`. | step stats |
| **tool call** | A model's request to run a tool within a step. | function call (when meaning a tool call) |
| **chunk** | One ordered piece of a message (text, thinking, tool-call/result, etc.), append-only in the log. | block, segment |
| **seq** | The monotonic, gap-free, per-conversation sequence number stamped on each chunk as it is appended to the log. The sync cursor: a client requests `?sinceSeq=N` to fetch only newer chunks. Storage/sync metadata, never message content. | cursor (when meaning the number), offset, index |
diff --git a/packages/conversation-store/src/keys.ts b/packages/conversation-store/src/keys.ts
index 8646a40..0ba25d8 100644
--- a/packages/conversation-store/src/keys.ts
+++ b/packages/conversation-store/src/keys.ts
@@ -25,3 +25,23 @@ export function parseChunkSeq(key: string): number {
const n = Number.parseInt(last, 10);
return Number.isNaN(n) ? -1 : n;
}
+
+export function metricsSeqKey(conversationId: string): string {
+ return `conv:${conversationId}:metrics-seq`;
+}
+
+export function metricsKey(conversationId: string, ordinal: number): string {
+ return `conv:${conversationId}:metrics:${String(ordinal).padStart(SEQ_PAD, "0")}`;
+}
+
+export function metricsPrefix(conversationId: string): string {
+ return `conv:${conversationId}:metrics:`;
+}
+
+export function parseMetricsOrdinal(key: string): number {
+ const parts = key.split(":");
+ const last = parts[parts.length - 1];
+ if (last === undefined) return -1;
+ const n = Number.parseInt(last, 10);
+ return Number.isNaN(n) ? -1 : n;
+}
diff --git a/packages/conversation-store/src/store.test.ts b/packages/conversation-store/src/store.test.ts
index e00fdc2..c7083e8 100644
--- a/packages/conversation-store/src/store.test.ts
+++ b/packages/conversation-store/src/store.test.ts
@@ -1,4 +1,4 @@
-import type { ChatMessage, StepId, StorageNamespace } from "@dispatch/kernel";
+import type { ChatMessage, StepId, StorageNamespace, TurnMetrics } from "@dispatch/kernel";
import { beforeEach, describe, expect, it } from "vitest";
import { createConversationStore } from "./store.js";
@@ -425,3 +425,141 @@ describe("ConversationStore", () => {
}
});
});
+
+describe("ConversationStore metrics", () => {
+ let storage: StorageNamespace;
+
+ beforeEach(() => {
+ storage = createMemoryStorage();
+ });
+
+ it("appendMetrics → loadMetrics round-trips a TurnMetrics (usage + durationMs + steps)", async () => {
+ const store = createConversationStore(storage);
+ const stepId = "step_1" as StepId;
+ const metrics: TurnMetrics = {
+ turnId: "turn_abc",
+ usage: { inputTokens: 100, outputTokens: 50 },
+ durationMs: 1234,
+ steps: [
+ {
+ stepId,
+ usage: { inputTokens: 100, outputTokens: 50 },
+ ttftMs: 200,
+ decodeMs: 800,
+ genTotalMs: 1000,
+ },
+ ],
+ };
+ await store.appendMetrics("conv1", metrics);
+ const result = await store.loadMetrics("conv1");
+ expect(result).toHaveLength(1);
+ expect(result[0]).toEqual(metrics);
+ });
+
+ it("loadMetrics returns turns in append order", async () => {
+ const store = createConversationStore(storage);
+ const metrics1: TurnMetrics = {
+ turnId: "turn_first",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ steps: [],
+ };
+ const metrics2: TurnMetrics = {
+ turnId: "turn_second",
+ usage: { inputTokens: 20, outputTokens: 10 },
+ steps: [],
+ };
+ const metrics3: TurnMetrics = {
+ turnId: "turn_third",
+ usage: { inputTokens: 30, outputTokens: 15 },
+ steps: [],
+ };
+ await store.appendMetrics("conv1", metrics1);
+ await store.appendMetrics("conv1", metrics2);
+ await store.appendMetrics("conv1", metrics3);
+ const result = await store.loadMetrics("conv1");
+ expect(result).toHaveLength(3);
+ expect(result[0]?.turnId).toBe("turn_first");
+ expect(result[1]?.turnId).toBe("turn_second");
+ expect(result[2]?.turnId).toBe("turn_third");
+ });
+
+ it("loadMetrics returns [] for a conversation with no persisted metrics", async () => {
+ const store = createConversationStore(storage);
+ const result = await store.loadMetrics("nonexistent");
+ expect(result).toEqual([]);
+ });
+
+ it("appendMetrics does not affect chunk load / loadSince", async () => {
+ const store = createConversationStore(storage);
+ const msg: ChatMessage = { role: "user", chunks: [{ type: "text", text: "hello" }] };
+ await store.append("conv1", [msg]);
+
+ const metrics: TurnMetrics = {
+ turnId: "turn_iso",
+ usage: { inputTokens: 100, outputTokens: 50 },
+ steps: [],
+ };
+ await store.appendMetrics("conv1", metrics);
+
+ const messages = await store.load("conv1");
+ expect(messages).toEqual([msg]);
+
+ const chunks = await store.loadSince("conv1");
+ expect(chunks).toHaveLength(1);
+ expect(chunks[0]?.chunk).toEqual({ type: "text", text: "hello" });
+ });
+
+ it("TurnMetrics with cache tokens + per-step ttft/decode/genTotal round-trips losslessly", async () => {
+ const store = createConversationStore(storage);
+ const stepId1 = "step_a" as StepId;
+ const stepId2 = "step_b" as StepId;
+ const metrics: TurnMetrics = {
+ turnId: "turn_cache",
+ usage: {
+ inputTokens: 500,
+ outputTokens: 200,
+ cacheReadTokens: 300,
+ cacheWriteTokens: 100,
+ },
+ durationMs: 5000,
+ steps: [
+ {
+ stepId: stepId1,
+ usage: {
+ inputTokens: 300,
+ outputTokens: 100,
+ cacheReadTokens: 200,
+ cacheWriteTokens: 50,
+ },
+ ttftMs: 150,
+ decodeMs: 600,
+ genTotalMs: 750,
+ },
+ {
+ stepId: stepId2,
+ usage: {
+ inputTokens: 200,
+ outputTokens: 100,
+ cacheReadTokens: 100,
+ cacheWriteTokens: 50,
+ },
+ ttftMs: 100,
+ decodeMs: 400,
+ genTotalMs: 500,
+ },
+ ],
+ };
+ await store.appendMetrics("conv1", metrics);
+ const result = await store.loadMetrics("conv1");
+ expect(result).toHaveLength(1);
+ expect(result[0]).toEqual(metrics);
+ expect(result[0]?.usage.cacheReadTokens).toBe(300);
+ expect(result[0]?.usage.cacheWriteTokens).toBe(100);
+ expect(result[0]?.steps[0]?.ttftMs).toBe(150);
+ expect(result[0]?.steps[0]?.decodeMs).toBe(600);
+ expect(result[0]?.steps[0]?.genTotalMs).toBe(750);
+ expect(result[0]?.steps[1]?.ttftMs).toBe(100);
+ expect(result[0]?.steps[1]?.decodeMs).toBe(400);
+ expect(result[0]?.steps[1]?.genTotalMs).toBe(500);
+ });
+});
diff --git a/packages/conversation-store/src/store.ts b/packages/conversation-store/src/store.ts
index df8bd4e..080cb79 100644
--- a/packages/conversation-store/src/store.ts
+++ b/packages/conversation-store/src/store.ts
@@ -1,6 +1,21 @@
-import type { ChatMessage, Chunk, Role, StorageNamespace, StoredChunk } from "@dispatch/kernel";
+import type {
+ ChatMessage,
+ Chunk,
+ Role,
+ StorageNamespace,
+ StoredChunk,
+ TurnMetrics,
+} from "@dispatch/kernel";
import { defineService } from "@dispatch/kernel";
-import { chunkKey, chunkPrefix, parseSeq, seqKey } from "./keys.js";
+import {
+ chunkKey,
+ chunkPrefix,
+ metricsKey,
+ metricsPrefix,
+ metricsSeqKey,
+ parseSeq,
+ seqKey,
+} from "./keys.js";
import { reconcile } from "./reconcile.js";
export interface ConversationStore {
@@ -10,6 +25,8 @@ export interface ConversationStore {
conversationId: string,
sinceSeq?: number,
) => Promise<readonly StoredChunk[]>;
+ readonly appendMetrics: (conversationId: string, metrics: TurnMetrics) => Promise<void>;
+ readonly loadMetrics: (conversationId: string) => Promise<readonly TurnMetrics[]>;
}
export const conversationStoreHandle = defineService<ConversationStore>("conversation-store/store");
@@ -100,5 +117,27 @@ export function createConversationStore(storage: StorageNamespace): Conversation
return result;
},
+
+ async appendMetrics(conversationId, metrics) {
+ const raw = await storage.get(metricsSeqKey(conversationId));
+ const ordinal = parseSeq(raw) + 1;
+ await storage.set(metricsKey(conversationId, ordinal), JSON.stringify(metrics));
+ await storage.set(metricsSeqKey(conversationId), String(ordinal));
+ },
+
+ async loadMetrics(conversationId) {
+ const prefix = metricsPrefix(conversationId);
+ const keys = await storage.keys(prefix);
+ const sorted = [...keys].sort();
+
+ const result: TurnMetrics[] = [];
+ for (const key of sorted) {
+ const value = await storage.get(key);
+ if (value === null) continue;
+ result.push(JSON.parse(value) as TurnMetrics);
+ }
+
+ return result;
+ },
};
}
diff --git a/packages/kernel/src/contracts/conversation.ts b/packages/kernel/src/contracts/conversation.ts
index 0080964..f4a342d 100644
--- a/packages/kernel/src/contracts/conversation.ts
+++ b/packages/kernel/src/contracts/conversation.ts
@@ -11,6 +11,7 @@ export type {
ErrorChunk,
Role,
StepId,
+ StepMetrics,
StoredChunk,
SystemChunk,
TextChunk,
@@ -18,4 +19,5 @@ export type {
ToolCallChunk,
ToolResultChunk,
TurnId,
+ TurnMetrics,
} from "@dispatch/wire";
diff --git a/packages/kernel/src/contracts/index.ts b/packages/kernel/src/contracts/index.ts
index 38f1442..b5802f3 100644
--- a/packages/kernel/src/contracts/index.ts
+++ b/packages/kernel/src/contracts/index.ts
@@ -18,6 +18,7 @@ export type {
ErrorChunk,
Role,
StepId,
+ StepMetrics,
StoredChunk,
SystemChunk,
TextChunk,
@@ -25,6 +26,7 @@ export type {
ToolCallChunk,
ToolResultChunk,
TurnId,
+ TurnMetrics,
} from "./conversation.js";
export type { ToolDispatchPolicy } from "./dispatch.js";
export type {
diff --git a/packages/kernel/src/runtime/run-turn.test.ts b/packages/kernel/src/runtime/run-turn.test.ts
index ce654d5..6db6645 100644
--- a/packages/kernel/src/runtime/run-turn.test.ts
+++ b/packages/kernel/src/runtime/run-turn.test.ts
@@ -1307,6 +1307,157 @@ describe("runTurn", () => {
// Only one decode span (for the second step)
expect(decodeOpens).toHaveLength(1);
});
+
+ it("turn span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn");
+ expect(turnClose).toBeDefined();
+ if (turnClose?.kind === "span-close") {
+ expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10);
+ expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5);
+ expect(turnClose.attributes?.usage_inputTokens).toBeUndefined();
+ expect(turnClose.attributes?.usage_outputTokens).toBeUndefined();
+ }
+ });
+
+ it("step span close stamps usage.inputTokens / usage.outputTokens (dotted)", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 7, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step");
+ expect(stepClose).toBeDefined();
+ if (stepClose?.kind === "span-close") {
+ expect(stepClose.attributes?.["usage.inputTokens"]).toBe(7);
+ expect(stepClose.attributes?.["usage.outputTokens"]).toBe(3);
+ expect(stepClose.attributes?.usage_inputTokens).toBeUndefined();
+ expect(stepClose.attributes?.usage_outputTokens).toBeUndefined();
+ }
+ });
+
+ it("turn + step spans stamp usage.cacheReadTokens / usage.cacheWriteTokens when the provider Usage carries them", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5, cacheReadTokens: 3, cacheWriteTokens: 2 },
+ },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn");
+ const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step");
+
+ expect(turnClose).toBeDefined();
+ if (turnClose?.kind === "span-close") {
+ expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10);
+ expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5);
+ expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBe(3);
+ expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBe(2);
+ }
+
+ expect(stepClose).toBeDefined();
+ if (stepClose?.kind === "span-close") {
+ expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10);
+ expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5);
+ expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBe(3);
+ expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBe(2);
+ }
+ });
+
+ it("turn + step spans OMIT the cache-token attrs when the provider Usage lacks them", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "hi" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const { logger, sink } = createTestLogger();
+
+ await runTurn({
+ provider,
+ messages: [userMessage],
+ tools: [],
+ dispatch: { maxConcurrent: 1, eager: false },
+ conversationId: "conv-1",
+ turnId: "turn-1",
+ emit: () => {},
+ logger,
+ });
+
+ const turnClose = sink.records.find((r) => r.kind === "span-close" && r.name === "turn");
+ const stepClose = sink.records.find((r) => r.kind === "span-close" && r.name === "step");
+
+ expect(turnClose).toBeDefined();
+ if (turnClose?.kind === "span-close") {
+ expect(turnClose.attributes?.["usage.inputTokens"]).toBe(10);
+ expect(turnClose.attributes?.["usage.outputTokens"]).toBe(5);
+ expect(turnClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined();
+ expect(turnClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined();
+ }
+
+ expect(stepClose).toBeDefined();
+ if (stepClose?.kind === "span-close") {
+ expect(stepClose.attributes?.["usage.inputTokens"]).toBe(10);
+ expect(stepClose.attributes?.["usage.outputTokens"]).toBe(5);
+ expect(stepClose.attributes?.["usage.cacheReadTokens"]).toBeUndefined();
+ expect(stepClose.attributes?.["usage.cacheWriteTokens"]).toBeUndefined();
+ }
+ });
});
describe("provider logger threading", () => {
diff --git a/packages/kernel/src/runtime/run-turn.ts b/packages/kernel/src/runtime/run-turn.ts
index 06069a2..d5db1bf 100644
--- a/packages/kernel/src/runtime/run-turn.ts
+++ b/packages/kernel/src/runtime/run-turn.ts
@@ -50,6 +50,20 @@ function addUsage(a: Usage, b: Usage): Usage {
return { inputTokens, outputTokens };
}
+function usageAttrs(usage: Usage): Record<string, string | number | boolean | null> {
+ const attrs: Record<string, string | number | boolean | null> = {
+ "usage.inputTokens": usage.inputTokens,
+ "usage.outputTokens": usage.outputTokens,
+ };
+ if (usage.cacheReadTokens !== undefined) {
+ attrs["usage.cacheReadTokens"] = usage.cacheReadTokens;
+ }
+ if (usage.cacheWriteTokens !== undefined) {
+ attrs["usage.cacheWriteTokens"] = usage.cacheWriteTokens;
+ }
+ return attrs;
+}
+
function appendTextDelta(chunks: Chunk[], delta: string): void {
const lastIdx = chunks.length - 1;
const last = chunks[lastIdx];
@@ -409,8 +423,7 @@ async function executeStep(ctx: StepContext): Promise<StepResult> {
stepSpan.end({
attrs: {
finishReason,
- usage_inputTokens: stepUsage.inputTokens,
- usage_outputTokens: stepUsage.outputTokens,
+ ...usageAttrs(stepUsage),
},
});
} catch {
@@ -533,8 +546,7 @@ export async function runTurn(input: RunTurnInput): Promise<RunTurnResult> {
turnSpan.end({
attrs: {
finishReason,
- usage_inputTokens: totalUsage.inputTokens,
- usage_outputTokens: totalUsage.outputTokens,
+ ...usageAttrs(totalUsage),
},
});
} catch {
diff --git a/packages/session-orchestrator/src/metrics.test.ts b/packages/session-orchestrator/src/metrics.test.ts
new file mode 100644
index 0000000..c123dba
--- /dev/null
+++ b/packages/session-orchestrator/src/metrics.test.ts
@@ -0,0 +1,264 @@
+import type { AgentEvent, StepId } from "@dispatch/kernel";
+import { describe, expect, it } from "vitest";
+import { createMetricsAccumulator } from "./metrics.js";
+
+function stepId(id: string): StepId {
+ return id as StepId;
+}
+
+describe("createMetricsAccumulator", () => {
+ it("builds a TurnMetrics from a single-step turn", () => {
+ const acc = createMetricsAccumulator();
+
+ const usageEvent: AgentEvent = {
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ usage: { inputTokens: 10, outputTokens: 5 },
+ };
+ const stepCompleteEvent: AgentEvent = {
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ ttftMs: 50,
+ decodeMs: 150,
+ genTotalMs: 200,
+ };
+ const doneEvent: AgentEvent = {
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ durationMs: 500,
+ usage: { inputTokens: 10, outputTokens: 5 },
+ };
+
+ acc.ingest(usageEvent);
+ acc.ingest(stepCompleteEvent);
+ acc.ingest(doneEvent);
+
+ const tm = acc.build("t1");
+ expect(tm.turnId).toBe("t1");
+ expect(tm.usage.inputTokens).toBe(10);
+ expect(tm.usage.outputTokens).toBe(5);
+ expect(tm.durationMs).toBe(500);
+ expect(tm.steps).toHaveLength(1);
+ expect(tm.steps[0]?.stepId).toBe(stepId("t1#0"));
+ expect(tm.steps[0]?.usage.inputTokens).toBe(10);
+ expect(tm.steps[0]?.usage.outputTokens).toBe(5);
+ expect(tm.steps[0]?.ttftMs).toBe(50);
+ expect(tm.steps[0]?.decodeMs).toBe(150);
+ expect(tm.steps[0]?.genTotalMs).toBe(200);
+ });
+
+ it("aggregates multi-step usage and preserves step order", () => {
+ const acc = createMetricsAccumulator();
+
+ acc.ingest({
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ usage: { inputTokens: 10, outputTokens: 5 },
+ });
+ acc.ingest({
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ genTotalMs: 100,
+ });
+ acc.ingest({
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#1"),
+ usage: { inputTokens: 20, outputTokens: 10 },
+ });
+ acc.ingest({
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#1"),
+ genTotalMs: 150,
+ });
+ acc.ingest({
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ usage: { inputTokens: 30, outputTokens: 15 },
+ });
+
+ const tm = acc.build("t1");
+ expect(tm.steps).toHaveLength(2);
+ expect(tm.steps[0]?.stepId).toBe(stepId("t1#0"));
+ expect(tm.steps[0]?.usage.inputTokens).toBe(10);
+ expect(tm.steps[1]?.stepId).toBe(stepId("t1#1"));
+ expect(tm.steps[1]?.usage.inputTokens).toBe(20);
+ expect(tm.usage.inputTokens).toBe(30);
+ expect(tm.usage.outputTokens).toBe(15);
+ });
+
+ it("joins per-step timing and usage by stepId", () => {
+ const acc = createMetricsAccumulator();
+
+ acc.ingest({
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ ttftMs: 50,
+ decodeMs: 100,
+ genTotalMs: 150,
+ });
+ acc.ingest({
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ usage: { inputTokens: 10, outputTokens: 5 },
+ });
+
+ const tm = acc.build("t1");
+ expect(tm.steps).toHaveLength(1);
+ expect(tm.steps[0]?.usage.inputTokens).toBe(10);
+ expect(tm.steps[0]?.ttftMs).toBe(50);
+ expect(tm.steps[0]?.decodeMs).toBe(100);
+ expect(tm.steps[0]?.genTotalMs).toBe(150);
+ });
+
+ it("turn-level usage comes from done event, not step sum", () => {
+ const acc = createMetricsAccumulator();
+
+ acc.ingest({
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ usage: { inputTokens: 10, outputTokens: 5 },
+ });
+ acc.ingest({
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ });
+ acc.ingest({
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ usage: { inputTokens: 100, outputTokens: 50 },
+ });
+
+ const tm = acc.build("t1");
+ expect(tm.usage.inputTokens).toBe(100);
+ expect(tm.usage.outputTokens).toBe(50);
+ });
+
+ it("falls back to summing step usage when done.usage is absent", () => {
+ const acc = createMetricsAccumulator();
+
+ acc.ingest({
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ usage: { inputTokens: 10, outputTokens: 5 },
+ });
+ acc.ingest({
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ });
+ acc.ingest({
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ });
+
+ const tm = acc.build("t1");
+ expect(tm.usage.inputTokens).toBe(10);
+ expect(tm.usage.outputTokens).toBe(5);
+ });
+
+ it("tolerates missing usage on a step (timing only)", () => {
+ const acc = createMetricsAccumulator();
+
+ acc.ingest({
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ genTotalMs: 200,
+ });
+ acc.ingest({
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ });
+
+ const tm = acc.build("t1");
+ expect(tm.steps).toHaveLength(1);
+ expect(tm.steps[0]?.usage.inputTokens).toBe(0);
+ expect(tm.steps[0]?.usage.outputTokens).toBe(0);
+ expect(tm.steps[0]?.genTotalMs).toBe(200);
+ });
+
+ it("tolerates missing timing on a step (usage only)", () => {
+ const acc = createMetricsAccumulator();
+
+ acc.ingest({
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ usage: { inputTokens: 10, outputTokens: 5 },
+ });
+ acc.ingest({
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ });
+
+ const tm = acc.build("t1");
+ expect(tm.steps).toHaveLength(1);
+ expect(tm.steps[0]?.usage.inputTokens).toBe(10);
+ expect(tm.steps[0]?.ttftMs).toBeUndefined();
+ expect(tm.steps[0]?.decodeMs).toBeUndefined();
+ expect(tm.steps[0]?.genTotalMs).toBeUndefined();
+ });
+
+ it("reset clears all accumulated state", () => {
+ const acc = createMetricsAccumulator();
+
+ acc.ingest({
+ type: "usage",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: stepId("t1#0"),
+ usage: { inputTokens: 10, outputTokens: 5 },
+ });
+ acc.ingest({
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ });
+
+ acc.reset();
+
+ const tm = acc.build("t2");
+ expect(tm.steps).toHaveLength(0);
+ expect(tm.usage.inputTokens).toBe(0);
+ expect(tm.usage.outputTokens).toBe(0);
+ });
+});
diff --git a/packages/session-orchestrator/src/metrics.ts b/packages/session-orchestrator/src/metrics.ts
new file mode 100644
index 0000000..e953bd9
--- /dev/null
+++ b/packages/session-orchestrator/src/metrics.ts
@@ -0,0 +1,124 @@
+import type {
+ AgentEvent,
+ StepId,
+ StepMetrics,
+ TurnDoneEvent,
+ TurnMetrics,
+ TurnStepCompleteEvent,
+ TurnUsageEvent,
+ Usage,
+} from "@dispatch/kernel";
+
+const zeroUsage: Usage = { inputTokens: 0, outputTokens: 0 };
+
+interface StepAccumulator {
+ readonly stepId: StepId;
+ usage: Usage | undefined;
+ ttftMs: number | undefined;
+ decodeMs: number | undefined;
+ genTotalMs: number | undefined;
+}
+
+export interface MetricsAccumulator {
+ readonly ingest: (event: AgentEvent) => void;
+ readonly build: (turnId: string) => TurnMetrics;
+ readonly reset: () => void;
+}
+
+export function createMetricsAccumulator(): MetricsAccumulator {
+ const steps = new Map<StepId, StepAccumulator>();
+ const stepOrder: StepId[] = [];
+ let doneUsage: Usage | undefined;
+ let doneDurationMs: number | undefined;
+
+ function getOrCreateStep(stepId: StepId): StepAccumulator {
+ let acc = steps.get(stepId);
+ if (acc === undefined) {
+ acc = {
+ stepId,
+ usage: undefined,
+ ttftMs: undefined,
+ decodeMs: undefined,
+ genTotalMs: undefined,
+ };
+ steps.set(stepId, acc);
+ stepOrder.push(stepId);
+ }
+ return acc;
+ }
+
+ function ingest(event: AgentEvent): void {
+ switch (event.type) {
+ case "usage": {
+ const e = event as TurnUsageEvent;
+ if (e.stepId !== undefined) {
+ const acc = getOrCreateStep(e.stepId);
+ acc.usage = e.usage;
+ }
+ break;
+ }
+ case "step-complete": {
+ const e = event as TurnStepCompleteEvent;
+ const acc = getOrCreateStep(e.stepId);
+ acc.ttftMs = e.ttftMs;
+ acc.decodeMs = e.decodeMs;
+ acc.genTotalMs = e.genTotalMs;
+ break;
+ }
+ case "done": {
+ const e = event as TurnDoneEvent;
+ doneUsage = e.usage;
+ doneDurationMs = e.durationMs;
+ break;
+ }
+ }
+ }
+
+ function build(turnId: string): TurnMetrics {
+ const stepMetrics: StepMetrics[] = stepOrder.map((stepId) => {
+ const acc = steps.get(stepId);
+ if (acc === undefined) {
+ return { stepId, usage: zeroUsage };
+ }
+ const usage = acc.usage ?? zeroUsage;
+ const sm: StepMetrics = { stepId, usage };
+ if (acc.ttftMs !== undefined) {
+ (sm as { ttftMs?: number }).ttftMs = acc.ttftMs;
+ }
+ if (acc.decodeMs !== undefined) {
+ (sm as { decodeMs?: number }).decodeMs = acc.decodeMs;
+ }
+ if (acc.genTotalMs !== undefined) {
+ (sm as { genTotalMs?: number }).genTotalMs = acc.genTotalMs;
+ }
+ return sm;
+ });
+
+ const aggregateUsage = doneUsage ?? sumStepUsage(stepMetrics);
+
+ const tm: TurnMetrics = { turnId, usage: aggregateUsage, steps: stepMetrics };
+ if (doneDurationMs !== undefined) {
+ (tm as { durationMs?: number }).durationMs = doneDurationMs;
+ }
+ return tm;
+ }
+
+ function reset(): void {
+ steps.clear();
+ stepOrder.length = 0;
+ doneUsage = undefined;
+ doneDurationMs = undefined;
+ }
+
+ return { ingest, build, reset };
+}
+
+function sumStepUsage(steps: readonly StepMetrics[]): Usage {
+ let inputTokens = 0;
+ let outputTokens = 0;
+ for (const s of steps) {
+ inputTokens += s.usage.inputTokens;
+ outputTokens += s.usage.outputTokens;
+ }
+ return { inputTokens, outputTokens };
+}
diff --git a/packages/session-orchestrator/src/orchestrator.test.ts b/packages/session-orchestrator/src/orchestrator.test.ts
index 3954ffe..ea564c5 100644
--- a/packages/session-orchestrator/src/orchestrator.test.ts
+++ b/packages/session-orchestrator/src/orchestrator.test.ts
@@ -7,6 +7,8 @@ import type {
RunTurnInput,
RunTurnResult,
StoredChunk,
+ ToolContract,
+ TurnMetrics,
} from "@dispatch/kernel";
import { runTurn } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
@@ -14,10 +16,13 @@ import { createSessionOrchestrator } from "./orchestrator.js";
function createInMemoryStore(): ConversationStore & {
readonly data: Map<string, ChatMessage[]>;
+ readonly metricsData: Map<string, TurnMetrics[]>;
} {
const data = new Map<string, ChatMessage[]>();
+ const metricsData = new Map<string, TurnMetrics[]>();
return {
data,
+ metricsData,
async append(conversationId, messages) {
const existing = data.get(conversationId) ?? [];
data.set(conversationId, [...existing, ...messages]);
@@ -39,6 +44,13 @@ function createInMemoryStore(): ConversationStore & {
}
return result;
},
+ async appendMetrics(conversationId, metrics) {
+ const existing = metricsData.get(conversationId) ?? [];
+ metricsData.set(conversationId, [...existing, metrics]);
+ },
+ async loadMetrics(conversationId) {
+ return [...(metricsData.get(conversationId) ?? [])];
+ },
};
}
@@ -63,6 +75,18 @@ function collectEvents(): { events: AgentEvent[]; onEvent: (event: AgentEvent) =
return { events, onEvent: (event) => events.push(event) };
}
+function createFakeTool(
+ name: string,
+ handler: (input: unknown) => Promise<{ content: string }>,
+): ToolContract {
+ return {
+ name,
+ description: `Fake tool: ${name}`,
+ parameters: { type: "object" },
+ execute: async (input) => handler(input),
+ };
+}
+
describe("handleMessage integration", () => {
it("loads history, runs turn, emits events, and persists result", async () => {
const store = createInMemoryStore();
@@ -465,6 +489,13 @@ describe("turn-sealed event", () => {
async loadSince(conversationId, sinceSeq) {
return store.loadSince(conversationId, sinceSeq);
},
+ async appendMetrics(conversationId, metrics) {
+ await store.appendMetrics(conversationId, metrics);
+ ordering.push("appendMetrics");
+ },
+ async loadMetrics(conversationId) {
+ return store.loadMetrics(conversationId);
+ },
};
const orchestrator = createSessionOrchestrator({
@@ -484,7 +515,7 @@ describe("turn-sealed event", () => {
},
});
- expect(ordering).toEqual(["append", "turn-sealed"]);
+ expect(ordering).toEqual(["append", "appendMetrics", "turn-sealed"]);
});
it("does not emit turn-sealed when append throws", async () => {
@@ -505,6 +536,12 @@ describe("turn-sealed event", () => {
async loadSince() {
return [];
},
+ async appendMetrics() {
+ return undefined;
+ },
+ async loadMetrics() {
+ return [];
+ },
};
const orchestrator = createSessionOrchestrator({
@@ -528,3 +565,283 @@ describe("turn-sealed event", () => {
expect(sealedEvents).toHaveLength(0);
});
});
+
+describe("turn metrics persistence", () => {
+ it("persists a TurnMetrics after a single-step turn seals", async () => {
+ const store = createInMemoryStore();
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "Hello" },
+ { type: "usage", usage: { inputTokens: 10, outputTokens: 5 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ const orchestrator = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ runTurn,
+ now: () => 1000,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-metrics-1",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const metrics = store.metricsData.get("conv-metrics-1");
+ expect(metrics).toBeDefined();
+ expect(metrics).toHaveLength(1);
+ expect(metrics?.[0]?.turnId).toMatch(/^turn-/);
+ expect(metrics?.[0]?.usage.inputTokens).toBe(10);
+ expect(metrics?.[0]?.usage.outputTokens).toBe(5);
+ expect(metrics?.[0]?.steps).toHaveLength(1);
+ expect(metrics?.[0]?.steps[0]?.usage.inputTokens).toBe(10);
+ expect(metrics?.[0]?.steps[0]?.usage.outputTokens).toBe(5);
+ });
+
+ it("TurnMetrics aggregates multi-step usage and carries each step's StepMetrics in order", async () => {
+ const store = createInMemoryStore();
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ let callIndex = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = callIndex++;
+ return (async function* () {
+ if (idx === 0) {
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "echo",
+ input: {},
+ } as ProviderEvent;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ } else {
+ yield { type: "text-delta", delta: "Step2" } as ProviderEvent;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 20, outputTokens: 10 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ const orchestrator = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [tool],
+ runTurn,
+ now: () => 1000,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-metrics-multi",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const metrics = store.metricsData.get("conv-metrics-multi");
+ expect(metrics).toBeDefined();
+ expect(metrics).toHaveLength(1);
+
+ const tm = metrics?.[0];
+ if (tm === undefined) throw new Error("expected metrics");
+
+ expect(tm.steps.length).toBeGreaterThanOrEqual(2);
+
+ expect(tm.steps[0]?.usage.inputTokens).toBe(10);
+ expect(tm.steps[0]?.usage.outputTokens).toBe(5);
+ expect(tm.steps[1]?.usage.inputTokens).toBe(20);
+ expect(tm.steps[1]?.usage.outputTokens).toBe(10);
+
+ expect(tm.usage.inputTokens).toBe(30);
+ expect(tm.usage.outputTokens).toBe(15);
+ });
+
+ it("per-step timing and usage are joined by stepId into one StepMetrics", async () => {
+ const store = createInMemoryStore();
+ const clock = createCounterNow();
+ clock.tick(100);
+
+ let callIndex = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = callIndex++;
+ return (async function* () {
+ if (idx === 0) {
+ clock.tick(50);
+ yield { type: "text-delta", delta: "Hello" } as ProviderEvent;
+ clock.tick(100);
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ } as ProviderEvent;
+ clock.tick(50);
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ const orchestrator = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ runTurn,
+ now: clock.now,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-metrics-join",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const metrics = store.metricsData.get("conv-metrics-join");
+ expect(metrics).toBeDefined();
+ expect(metrics).toHaveLength(1);
+
+ const tm = metrics?.[0];
+ if (tm === undefined) throw new Error("expected metrics");
+
+ expect(tm.steps).toHaveLength(1);
+ const step = tm.steps[0];
+ if (step === undefined) throw new Error("expected step");
+
+ expect(step.usage.inputTokens).toBe(10);
+ expect(step.usage.outputTokens).toBe(5);
+ expect(step.genTotalMs).toBe(200);
+ expect(step.ttftMs).toBe(50);
+ expect(step.decodeMs).toBe(150);
+ });
+
+ it("turn-level usage comes from the done event aggregate", async () => {
+ const store = createInMemoryStore();
+ const tool = createFakeTool("echo", async () => ({ content: "echoed" }));
+
+ let callIndex = 0;
+ const provider: ProviderContract = {
+ id: "fake",
+ stream() {
+ const idx = callIndex++;
+ return (async function* () {
+ if (idx === 0) {
+ yield {
+ type: "tool-call",
+ toolCallId: "tc1",
+ toolName: "echo",
+ input: {},
+ } as ProviderEvent;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 10, outputTokens: 5 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "tool-calls" } as ProviderEvent;
+ } else {
+ yield { type: "text-delta", delta: "Step2" } as ProviderEvent;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 20, outputTokens: 10 },
+ } as ProviderEvent;
+ yield { type: "finish", reason: "stop" } as ProviderEvent;
+ }
+ })();
+ },
+ };
+
+ const orchestrator = createSessionOrchestrator({
+ conversationStore: store,
+ resolveProvider: () => provider,
+ resolveTools: () => [tool],
+ runTurn,
+ now: () => 1000,
+ });
+
+ await orchestrator.handleMessage({
+ conversationId: "conv-metrics-done",
+ text: "test",
+ onEvent: () => {},
+ });
+
+ const metrics = store.metricsData.get("conv-metrics-done");
+ expect(metrics).toBeDefined();
+ expect(metrics).toHaveLength(1);
+
+ const tm = metrics?.[0];
+ if (tm === undefined) throw new Error("expected metrics");
+
+ expect(tm.usage.inputTokens).toBe(30);
+ expect(tm.usage.outputTokens).toBe(15);
+ });
+
+ it("does not persist metrics nor emit turn-sealed when chunk append fails", async () => {
+ const provider = createFakeProvider([
+ [
+ { type: "text-delta", delta: "ok" },
+ { type: "usage", usage: { inputTokens: 5, outputTokens: 3 } },
+ { type: "finish", reason: "stop" },
+ ],
+ ]);
+
+ let metricsAppended = false;
+ const failingMetricsStore: ConversationStore = {
+ async append() {
+ throw new Error("storage failure");
+ },
+ async load() {
+ return [];
+ },
+ async loadSince() {
+ return [];
+ },
+ async appendMetrics() {
+ metricsAppended = true;
+ },
+ async loadMetrics() {
+ return [];
+ },
+ };
+
+ const orchestrator = createSessionOrchestrator({
+ conversationStore: failingMetricsStore,
+ resolveProvider: () => provider,
+ resolveTools: () => [],
+ runTurn,
+ });
+
+ const { events, onEvent } = collectEvents();
+
+ await expect(
+ orchestrator.handleMessage({
+ conversationId: "conv-fail-metrics",
+ text: "test",
+ onEvent,
+ }),
+ ).rejects.toThrow("storage failure");
+
+ const sealedEvents = events.filter((e) => e.type === "turn-sealed");
+ expect(sealedEvents).toHaveLength(0);
+ expect(metricsAppended).toBe(false);
+ });
+});
+
+function createCounterNow(): { now: () => number; tick: (ms: number) => void } {
+ let t = 0;
+ return {
+ now: () => t,
+ tick(ms: number) {
+ t += ms;
+ },
+ };
+}
diff --git a/packages/session-orchestrator/src/orchestrator.ts b/packages/session-orchestrator/src/orchestrator.ts
index 04f6ad2..d84b805 100644
--- a/packages/session-orchestrator/src/orchestrator.ts
+++ b/packages/session-orchestrator/src/orchestrator.ts
@@ -11,6 +11,7 @@ import type {
ToolDispatchPolicy,
} from "@dispatch/kernel";
import { defineService } from "@dispatch/kernel";
+import { createMetricsAccumulator } from "./metrics.js";
import { buildUserMessage, defaultDispatchPolicy, generateTurnId } from "./pure.js";
export interface SessionOrchestrator {
@@ -73,13 +74,19 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio
const tools = deps.resolveTools();
const dispatch = deps.resolveDispatch?.() ?? defaultDispatchPolicy();
const turnLogger = deps.logger?.child({ conversationId, turnId });
+ const metrics = createMetricsAccumulator();
+
+ const emitAndAccumulate = (event: AgentEvent): void => {
+ metrics.ingest(event);
+ onEvent(event);
+ };
const opts: RunTurnInput = {
provider,
messages: [...history, userMsg],
tools,
dispatch,
- emit: onEvent,
+ emit: emitAndAccumulate,
conversationId,
turnId,
...(modelOverride !== undefined
@@ -96,6 +103,9 @@ export function createSessionOrchestrator(deps: SessionOrchestratorDeps): Sessio
const toPersist: ChatMessage[] = [userMsg, ...result.messages];
await deps.conversationStore.append(conversationId, toPersist);
+ const turnMetrics = metrics.build(turnId);
+ await deps.conversationStore.appendMetrics(conversationId, turnMetrics);
+
onEvent({ type: "turn-sealed", conversationId, turnId });
},
};
diff --git a/packages/transport-contract/package.json b/packages/transport-contract/package.json
index a2711af..9b6e9b3 100644
--- a/packages/transport-contract/package.json
+++ b/packages/transport-contract/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/transport-contract",
- "version": "0.3.0",
+ "version": "0.4.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index cdddad6..bc01ca6 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -20,9 +20,9 @@
*/
import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
-import type { AgentEvent, StoredChunk } from "@dispatch/wire";
+import type { AgentEvent, StoredChunk, TurnMetrics } from "@dispatch/wire";
-export type { AgentEvent, StoredChunk } from "@dispatch/wire";
+export type { AgentEvent, StepMetrics, StoredChunk, TurnMetrics } from "@dispatch/wire";
/**
* Request body for `POST /chat` (sent as JSON).
@@ -92,6 +92,25 @@ export interface ConversationHistoryResponse {
readonly latestSeq: number;
}
+/**
+ * Response body for `GET /conversations/:id/metrics` — the persisted per-turn
+ * (and per-step) token + timing metrics for a conversation, for a client
+ * reopening a past conversation to render historical usage/latency.
+ *
+ * This is a SEPARATE axis from the two other read concerns and is deliberately
+ * its own endpoint: the live `usage`/`step-complete`/`done` events are transient
+ * (not persisted), and `ConversationHistoryResponse` carries seq-cursor chunk
+ * CONTENT. Metrics are keyed per TURN (not per chunk) and so are not seq-filtered
+ * — hence a sibling route rather than a field on the history response.
+ *
+ * `turns` is every SEALED turn's `TurnMetrics` in turn order. A turn appears only
+ * after its metrics were persisted (post-seal); an in-flight or unsealed turn is
+ * absent until then.
+ */
+export interface ConversationMetricsResponse {
+ readonly turns: readonly TurnMetrics[];
+}
+
// ─── WebSocket chat ops ───────────────────────────────────────────────────────
// The persistent WS connection multiplexes chat ops (below) with surface ops
// (`@dispatch/ui-contract`). The unified unions at the bottom compose both. Chat
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts
index aa47dce..0a6c5b0 100644
--- a/packages/transport-http/src/app.test.ts
+++ b/packages/transport-http/src/app.test.ts
@@ -1,4 +1,4 @@
-import type { AgentEvent, Logger, StoredChunk } from "@dispatch/kernel";
+import type { AgentEvent, Logger, StepId, StoredChunk, TurnMetrics } from "@dispatch/kernel";
import { describe, expect, it } from "vitest";
import { createApp } from "./app.js";
import type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js";
@@ -47,6 +47,7 @@ function createFakeLogger(): Logger & { readonly records: readonly CapturedLog[]
function createFakeConversationStore(
store: Map<string, StoredChunk[]> = new Map(),
+ metricsStore: Map<string, TurnMetrics[]> = new Map(),
): ConversationStore {
return {
async append() {},
@@ -58,6 +59,10 @@ function createFakeConversationStore(
const minSeq = sinceSeq ?? 0;
return chunks.filter((c) => c.seq > minSeq);
},
+ async appendMetrics() {},
+ async loadMetrics(conversationId) {
+ return metricsStore.get(conversationId) ?? [];
+ },
};
}
@@ -465,6 +470,127 @@ describe("GET /conversations/:id", () => {
});
});
+describe("GET /conversations/:id/metrics", () => {
+ const sampleMetrics: TurnMetrics[] = [
+ {
+ turnId: "turn1",
+ usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ durationMs: 1000,
+ steps: [
+ {
+ stepId: "step1" as StepId,
+ usage: { inputTokens: 100, outputTokens: 50, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ ttftMs: 200,
+ decodeMs: 300,
+ genTotalMs: 500,
+ },
+ ],
+ },
+ {
+ turnId: "turn2",
+ usage: { inputTokens: 200, outputTokens: 80, cacheReadTokens: 10, cacheWriteTokens: 5 },
+ durationMs: 1500,
+ steps: [
+ {
+ stepId: "step2" as StepId,
+ usage: { inputTokens: 200, outputTokens: 80, cacheReadTokens: 10, cacheWriteTokens: 5 },
+ ttftMs: 300,
+ decodeMs: 500,
+ genTotalMs: 800,
+ },
+ ],
+ },
+ ];
+
+ it("returns persisted turn metrics as { turns }", async () => {
+ const metricsStore = new Map<string, TurnMetrics[]>([["conv1", sampleMetrics]]);
+ const app = createApp({
+ conversationStore: createFakeConversationStore(new Map(), metricsStore),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+
+ const res = await app.request("/conversations/conv1/metrics");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { turns: readonly TurnMetrics[] };
+ expect(body.turns).toHaveLength(2);
+ expect(body.turns[0]?.turnId).toBe("turn1");
+ expect(body.turns[1]?.turnId).toBe("turn2");
+ });
+
+ it("returns { turns: [] } for an unknown conversation", async () => {
+ const app = createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+
+ const res = await app.request("/conversations/unknown/metrics");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { turns: readonly TurnMetrics[] };
+ expect(body.turns).toHaveLength(0);
+ });
+
+ it("the metrics route does not collide with GET /conversations/:id history route", async () => {
+ const sampleChunks: StoredChunk[] = [
+ { seq: 1, role: "user", chunk: { type: "text", text: "hello" } },
+ ];
+ const store = new Map<string, StoredChunk[]>([["conv1", sampleChunks]]);
+ const metricsStore = new Map<string, TurnMetrics[]>([["conv1", sampleMetrics]]);
+ const app = createApp({
+ conversationStore: createFakeConversationStore(store, metricsStore),
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ });
+
+ const metricsRes = await app.request("/conversations/conv1/metrics");
+ expect(metricsRes.status).toBe(200);
+ const metricsBody = (await metricsRes.json()) as { turns: readonly TurnMetrics[] };
+ expect(metricsBody.turns).toHaveLength(2);
+
+ const historyRes = await app.request("/conversations/conv1");
+ expect(historyRes.status).toBe(200);
+ const historyBody = (await historyRes.json()) as {
+ chunks: readonly StoredChunk[];
+ latestSeq: number;
+ };
+ expect(historyBody.chunks).toHaveLength(1);
+ });
+
+ it("a store failure on the metrics read returns an error status + logs an error", async () => {
+ const logger = createFakeLogger();
+ const brokenStore: ConversationStore = {
+ async append() {},
+ async load() {
+ return [];
+ },
+ async loadSince() {
+ return [];
+ },
+ async appendMetrics() {},
+ async loadMetrics() {
+ throw new Error("storage exploded");
+ },
+ };
+ const app = createApp({
+ conversationStore: brokenStore,
+ orchestrator: createFakeOrchestrator([]),
+ credentialStore: createFakeCredentialStore([]),
+ logger,
+ });
+
+ const res = await app.request("/conversations/conv1/metrics");
+ expect(res.status).toBe(500);
+ const body = (await res.json()) as { error: string };
+ expect(body.error).toContain("Failed to load conversation metrics");
+
+ const errorLogs = logger.records.filter((r) => r.level === "error");
+ expect(errorLogs).toHaveLength(1);
+ expect(errorLogs[0]?.msg).toBe("conversations: metrics store failure");
+ expect(errorLogs[0]?.attrs?.err).toBeInstanceOf(Error);
+ });
+});
+
describe("POST /chat logging", () => {
it("POST /chat logs an info line when a request is accepted", async () => {
const logger = createFakeLogger();
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index a8edc71..4002e23 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -1,5 +1,9 @@
import type { AgentEvent, Logger } from "@dispatch/kernel";
-import type { ConversationHistoryResponse, ModelsResponse } from "@dispatch/transport-contract";
+import type {
+ ConversationHistoryResponse,
+ ConversationMetricsResponse,
+ ModelsResponse,
+} from "@dispatch/transport-contract";
import { Hono } from "hono";
import { cors } from "hono/cors";
import {
@@ -57,6 +61,23 @@ export function createApp(opts: CreateServerOptions): Hono {
app.get("/health", (c) => c.json({ ok: true }));
+ app.get("/conversations/:id/metrics", async (c) => {
+ const conversationId = c.req.param("id");
+
+ try {
+ const turns = await opts.conversationStore.loadMetrics(conversationId);
+ log.info("conversations: metrics read", {
+ conversationId,
+ count: turns.length,
+ });
+ const body: ConversationMetricsResponse = { turns };
+ return c.json(body, 200);
+ } catch (err) {
+ log.error("conversations: metrics store failure", { err });
+ return c.json({ error: "Failed to load conversation metrics" }, 500);
+ }
+ });
+
app.get("/conversations/:id", async (c) => {
const conversationId = c.req.param("id");
const sinceSeqResult = parseSinceSeq(c.req.query("sinceSeq"));
diff --git a/packages/transport-http/src/server.bun.test.ts b/packages/transport-http/src/server.bun.test.ts
index e824d18..b2a978a 100644
--- a/packages/transport-http/src/server.bun.test.ts
+++ b/packages/transport-http/src/server.bun.test.ts
@@ -37,6 +37,10 @@ function fakeConversationStore(): ConversationStore {
async loadSince() {
return [];
},
+ async appendMetrics() {},
+ async loadMetrics() {
+ return [];
+ },
};
}
diff --git a/packages/wire/package.json b/packages/wire/package.json
index 762c06e..790c7e1 100644
--- a/packages/wire/package.json
+++ b/packages/wire/package.json
@@ -1,6 +1,6 @@
{
"name": "@dispatch/wire",
- "version": "0.3.0",
+ "version": "0.4.0",
"type": "module",
"private": true,
"main": "dist/index.js",
diff --git a/packages/wire/src/index.ts b/packages/wire/src/index.ts
index a4790de..aa6f9d0 100644
--- a/packages/wire/src/index.ts
+++ b/packages/wire/src/index.ts
@@ -153,6 +153,47 @@ export interface Usage {
readonly cacheWriteTokens?: number;
}
+// ─── Persisted metrics ───────────────────────────────────────────────────────
+
+/**
+ * Durable per-step metrics for a completed step — the persisted, replayable
+ * counterpart of the live `usage` + `step-complete` events. Combines the step's
+ * token usage with its generation timing so a client reopening a past
+ * conversation renders the same per-step token/latency breakdown it would have
+ * seen live. Built from the turn's events, stored by `conversation-store`, and
+ * served by `GET /conversations/:id/metrics`.
+ */
+export interface StepMetrics {
+ readonly stepId: StepId;
+ /** The step's token usage (all four counters; cache fields optional per `Usage`). */
+ readonly usage: Usage;
+ /** Time to first token (stream start → first text/reasoning delta). Optional — see `TurnStepCompleteEvent.ttftMs`. */
+ readonly ttftMs?: number;
+ /** Decode time (first token → stream end). Optional — see `TurnStepCompleteEvent.decodeMs`. */
+ readonly decodeMs?: number;
+ /** Total generation time for the step (stream start → stream end). Optional: present only when a clock was available. */
+ readonly genTotalMs?: number;
+}
+
+/**
+ * Durable per-turn metrics for a completed (sealed) turn — the persisted,
+ * replayable counterpart of the live `done` event's aggregate `usage` +
+ * `durationMs`, plus the per-step breakdown. `usage` is the aggregate across all
+ * steps; `steps` carries each step's `StepMetrics` in step order. Persisted per
+ * turn by `conversation-store` (returned in turn-append order) and served by
+ * `GET /conversations/:id/metrics`. (`turnId` is the plain wire string carried
+ * on every `AgentEvent`, the join key to the live stream.)
+ */
+export interface TurnMetrics {
+ readonly turnId: string;
+ /** Aggregate token usage across all steps in the turn. */
+ readonly usage: Usage;
+ /** Total wall-clock duration of the turn (turn start → turn end). Optional: present only when a clock was available. */
+ readonly durationMs?: number;
+ /** Per-step metrics in step order. */
+ readonly steps: readonly StepMetrics[];
+}
+
// ─── Outward events ─────────────────────────────────────────────────────────
/**