summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-10 10:16:20 +0900
committerAdam Malczewski <[email protected]>2026-06-10 10:16:20 +0900
commit80e14ab59732aabbf06035d13138500f133e921d (patch)
treebe3c1c118ceeb207d7ff218aa5d7ace06e683710
parent5ff460688519e48fd0bfab893ebaed4258dee789 (diff)
downloaddispatch-80e14ab59732aabbf06035d13138500f133e921d.tar.gz
dispatch-80e14ab59732aabbf06035d13138500f133e921d.zip
feat: per-model throughput (tok/s) tracking + metrics endpoint
New throughput-store extension records one token-weighted sample per turn (model, output tokens, pure generation time = Σ step genTotalMs) into a day-bucketed KV store, and aggregates per-model tok/s = Σtokens / Σgen-seconds over a day/week/month (server-local boundaries; week = ISO Mon–Sun). transport-http records a sample per turn (logged) and serves GET /metrics/throughput?period=day|week|month&date=<...>. The response is typed as transport-contract's ThroughputResponse, so store/wire drift is a compile error. Pure period + aggregate logic fully unit-tested.
-rw-r--r--bun.lock15
-rw-r--r--packages/host-bin/package.json1
-rw-r--r--packages/host-bin/src/main.ts2
-rw-r--r--packages/host-bin/tsconfig.json1
-rw-r--r--packages/throughput-store/package.json11
-rw-r--r--packages/throughput-store/src/aggregate.test.ts50
-rw-r--r--packages/throughput-store/src/aggregate.ts72
-rw-r--r--packages/throughput-store/src/extension.ts24
-rw-r--r--packages/throughput-store/src/index.ts16
-rw-r--r--packages/throughput-store/src/period.test.ts67
-rw-r--r--packages/throughput-store/src/period.ts113
-rw-r--r--packages/throughput-store/src/service.ts5
-rw-r--r--packages/throughput-store/src/store.test.ts72
-rw-r--r--packages/throughput-store/src/store.ts83
-rw-r--r--packages/throughput-store/tsconfig.json6
-rw-r--r--packages/transport-contract/src/index.ts43
-rw-r--r--packages/transport-http/package.json1
-rw-r--r--packages/transport-http/src/app.test.ts138
-rw-r--r--packages/transport-http/src/app.ts75
-rw-r--r--packages/transport-http/src/extension.ts9
-rw-r--r--packages/transport-http/src/seam.ts2
-rw-r--r--packages/transport-http/tsconfig.json1
-rw-r--r--tsconfig.json1
23 files changed, 802 insertions, 6 deletions
diff --git a/bun.lock b/bun.lock
index bc33ba7..5911ffd 100644
--- a/bun.lock
+++ b/bun.lock
@@ -54,6 +54,7 @@
"@dispatch/storage-sqlite": "workspace:*",
"@dispatch/surface-loaded-extensions": "workspace:*",
"@dispatch/surface-registry": "workspace:*",
+ "@dispatch/throughput-store": "workspace:*",
"@dispatch/tool-read-file": "workspace:*",
"@dispatch/transport-http": "workspace:*",
"@dispatch/transport-ws": "workspace:*",
@@ -122,6 +123,13 @@
"@dispatch/ui-contract": "workspace:*",
},
},
+ "packages/throughput-store": {
+ "name": "@dispatch/throughput-store",
+ "version": "0.0.0",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*",
+ },
+ },
"packages/tool-read-file": {
"name": "@dispatch/tool-read-file",
"version": "0.0.0",
@@ -142,7 +150,7 @@
},
"packages/transport-contract": {
"name": "@dispatch/transport-contract",
- "version": "0.1.0",
+ "version": "0.4.0",
"dependencies": {
"@dispatch/ui-contract": "workspace:*",
"@dispatch/wire": "workspace:*",
@@ -156,6 +164,7 @@
"@dispatch/credential-store": "workspace:*",
"@dispatch/kernel": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
+ "@dispatch/throughput-store": "workspace:*",
"@dispatch/transport-contract": "workspace:*",
"hono": "^4.0.0",
},
@@ -177,7 +186,7 @@
},
"packages/wire": {
"name": "@dispatch/wire",
- "version": "0.1.0",
+ "version": "0.4.0",
},
},
"packages": {
@@ -225,6 +234,8 @@
"@dispatch/surface-registry": ["@dispatch/surface-registry@workspace:packages/surface-registry"],
+ "@dispatch/throughput-store": ["@dispatch/throughput-store@workspace:packages/throughput-store"],
+
"@dispatch/tool-read-file": ["@dispatch/tool-read-file@workspace:packages/tool-read-file"],
"@dispatch/trace-replay": ["@dispatch/trace-replay@workspace:packages/trace-replay"],
diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json
index 65e986d..ccaddd2 100644
--- a/packages/host-bin/package.json
+++ b/packages/host-bin/package.json
@@ -11,6 +11,7 @@
"@dispatch/credential-store": "workspace:*",
"@dispatch/provider-openai-compat": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
+ "@dispatch/throughput-store": "workspace:*",
"@dispatch/transport-http": "workspace:*",
"@dispatch/tool-read-file": "workspace:*",
"@dispatch/journal-sink": "workspace:*",
diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts
index ec51d11..7f219d1 100644
--- a/packages/host-bin/src/main.ts
+++ b/packages/host-bin/src/main.ts
@@ -23,6 +23,7 @@ import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestra
import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite";
import { createLoadedExtensionsExtension } from "@dispatch/surface-loaded-extensions";
import { createSurfaceRegistryExtension } from "@dispatch/surface-registry";
+import { extension as throughputStoreExt } from "@dispatch/throughput-store";
import { extension as toolReadFileExt } from "@dispatch/tool-read-file";
import { createTransportHttpExtension } from "@dispatch/transport-http";
import { createTransportWsExtension } from "@dispatch/transport-ws";
@@ -62,6 +63,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [
authApikeyExt,
providerOpenaiCompatExt,
toolReadFileExt,
+ throughputStoreExt,
sessionOrchestratorExt,
createTransportHttpExtension(),
// Surface extensions — dependency order: surface-registry first, then consumers.
diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json
index 3630394..53762c7 100644
--- a/packages/host-bin/tsconfig.json
+++ b/packages/host-bin/tsconfig.json
@@ -8,6 +8,7 @@
{ "path": "../surface-loaded-extensions" },
{ "path": "../surface-registry" },
{ "path": "../tool-read-file" },
+ { "path": "../throughput-store" },
{ "path": "../transport-http" },
{ "path": "../transport-ws" }
]
diff --git a/packages/throughput-store/package.json b/packages/throughput-store/package.json
new file mode 100644
index 0000000..9624c7c
--- /dev/null
+++ b/packages/throughput-store/package.json
@@ -0,0 +1,11 @@
+{
+ "name": "@dispatch/throughput-store",
+ "version": "0.0.0",
+ "type": "module",
+ "private": true,
+ "main": "dist/index.js",
+ "types": "dist/index.d.ts",
+ "dependencies": {
+ "@dispatch/kernel": "workspace:*"
+ }
+}
diff --git a/packages/throughput-store/src/aggregate.test.ts b/packages/throughput-store/src/aggregate.test.ts
new file mode 100644
index 0000000..27b800c
--- /dev/null
+++ b/packages/throughput-store/src/aggregate.test.ts
@@ -0,0 +1,50 @@
+import { describe, expect, it } from "vitest";
+import { aggregateSamples, type ThroughputSample } from "./aggregate.js";
+
+const S = (model: string, ts: number, outputTokens: number, genMs: number): ThroughputSample => ({
+ model,
+ ts,
+ outputTokens,
+ genMs,
+});
+
+describe("aggregateSamples", () => {
+ it("token-weights tok/s (Σtokens / Σgen-seconds), so large turns dominate", () => {
+ const samples = [
+ S("claude/haiku", 100, 1000, 10_000), // 100 tok/s, big turn
+ S("claude/haiku", 200, 10, 1000), // 10 tok/s, small turn
+ ];
+ const [row] = aggregateSamples(samples, 0, 1000);
+ expect(row?.model).toBe("claude/haiku");
+ // 1010 tokens / 11 s = 91.82, NOT the simple mean (55)
+ expect(row?.tokensPerSecond).toBeCloseTo(91.82, 1);
+ expect(row?.totalOutputTokens).toBe(1010);
+ expect(row?.totalGenMs).toBe(11_000);
+ expect(row?.turns).toBe(2);
+ });
+
+ it("excludes samples outside the [start, end) range", () => {
+ const samples = [S("m", 50, 100, 1000), S("m", 500, 100, 1000), S("m", 1500, 999, 1000)];
+ const [row] = aggregateSamples(samples, 100, 1000);
+ expect(row?.turns).toBe(1); // only ts=500 is in [100, 1000)
+ expect(row?.totalOutputTokens).toBe(100);
+ });
+
+ it("groups by model and sorts by tok/s descending", () => {
+ const samples = [
+ S("slow", 10, 50, 5000), // 10 tok/s
+ S("fast", 10, 500, 1000), // 500 tok/s
+ ];
+ const rows = aggregateSamples(samples, 0, 100);
+ expect(rows.map((r) => r.model)).toEqual(["fast", "slow"]);
+ });
+
+ it("reports 0 tok/s when generation time is 0 (avoids divide-by-zero)", () => {
+ const [row] = aggregateSamples([S("m", 10, 100, 0)], 0, 100);
+ expect(row?.tokensPerSecond).toBe(0);
+ });
+
+ it("returns an empty list when no samples match", () => {
+ expect(aggregateSamples([], 0, 100)).toEqual([]);
+ });
+});
diff --git a/packages/throughput-store/src/aggregate.ts b/packages/throughput-store/src/aggregate.ts
new file mode 100644
index 0000000..2437d9f
--- /dev/null
+++ b/packages/throughput-store/src/aggregate.ts
@@ -0,0 +1,72 @@
+/**
+ * Pure throughput aggregation.
+ *
+ * Per model, tokens-per-second is the TOKEN-WEIGHTED average:
+ *
+ * tok/s = Σ(output_tokens) / Σ(generation_seconds)
+ *
+ * i.e. total tokens over total generation time across the period's turns. This
+ * makes a turn that generated more tokens count proportionally more than a small
+ * turn — large turns dominate, exactly as intended. Generation time is the pure
+ * decode time (excludes tool-execution waits).
+ */
+
+export interface ThroughputSample {
+ readonly model: string;
+ /** Epoch-ms the turn completed. */
+ readonly ts: number;
+ /** Output tokens generated in the turn. */
+ readonly outputTokens: number;
+ /** Pure generation time for the turn (ms), summed across its steps. */
+ readonly genMs: number;
+}
+
+export interface ModelThroughput {
+ readonly model: string;
+ /** Token-weighted average tokens/second over the period. */
+ readonly tokensPerSecond: number;
+ readonly totalOutputTokens: number;
+ readonly totalGenMs: number;
+ /** Number of turns that contributed. */
+ readonly turns: number;
+}
+
+/**
+ * Aggregate samples within the half-open range `[start, end)` into per-model
+ * throughput, sorted by tok/s descending (ties broken by model name).
+ */
+export function aggregateSamples(
+ samples: readonly ThroughputSample[],
+ start: number,
+ end: number,
+): ModelThroughput[] {
+ const byModel = new Map<string, { tokens: number; genMs: number; turns: number }>();
+
+ for (const s of samples) {
+ if (s.ts < start || s.ts >= end) continue;
+ const acc = byModel.get(s.model) ?? { tokens: 0, genMs: 0, turns: 0 };
+ acc.tokens += s.outputTokens;
+ acc.genMs += s.genMs;
+ acc.turns += 1;
+ byModel.set(s.model, acc);
+ }
+
+ const result: ModelThroughput[] = [];
+ for (const [model, acc] of byModel) {
+ const tokensPerSecond = acc.genMs > 0 ? round2(acc.tokens / (acc.genMs / 1000)) : 0;
+ result.push({
+ model,
+ tokensPerSecond,
+ totalOutputTokens: acc.tokens,
+ totalGenMs: acc.genMs,
+ turns: acc.turns,
+ });
+ }
+
+ result.sort((a, b) => b.tokensPerSecond - a.tokensPerSecond || a.model.localeCompare(b.model));
+ return result;
+}
+
+function round2(n: number): number {
+ return Math.round(n * 100) / 100;
+}
diff --git a/packages/throughput-store/src/extension.ts b/packages/throughput-store/src/extension.ts
new file mode 100644
index 0000000..01d1549
--- /dev/null
+++ b/packages/throughput-store/src/extension.ts
@@ -0,0 +1,24 @@
+import type { Extension, HostAPI, Manifest } from "@dispatch/kernel";
+import { throughputStoreHandle } from "./service.js";
+import { createThroughputStore } from "./store.js";
+
+export const manifest: Manifest = {
+ id: "throughput-store",
+ name: "Throughput Store",
+ version: "0.0.0",
+ apiVersion: "^0.1.0",
+ trust: "bundled",
+ capabilities: { db: true },
+ contributes: { services: ["throughput-store/store"] },
+ activation: "eager",
+};
+
+export const extension: Extension = {
+ manifest,
+ activate: (host: HostAPI) => {
+ const storage = host.storage("throughput-store");
+ const store = createThroughputStore({ storage, logger: host.logger });
+ host.provideService(throughputStoreHandle, store);
+ host.logger.info("throughput-store: registered");
+ },
+};
diff --git a/packages/throughput-store/src/index.ts b/packages/throughput-store/src/index.ts
new file mode 100644
index 0000000..24ebaba
--- /dev/null
+++ b/packages/throughput-store/src/index.ts
@@ -0,0 +1,16 @@
+export {
+ aggregateSamples,
+ type ModelThroughput,
+ type ThroughputSample,
+} from "./aggregate.js";
+export { extension, manifest } from "./extension.js";
+export { dayKeyOf, type Period, resolvePeriod } from "./period.js";
+export { throughputStoreHandle } from "./service.js";
+export {
+ createThroughputStore,
+ type ThroughputQuery,
+ ThroughputQueryError,
+ type ThroughputReport,
+ type ThroughputStore,
+ type ThroughputStoreDeps,
+} from "./store.js";
diff --git a/packages/throughput-store/src/period.test.ts b/packages/throughput-store/src/period.test.ts
new file mode 100644
index 0000000..b39437c
--- /dev/null
+++ b/packages/throughput-store/src/period.test.ts
@@ -0,0 +1,67 @@
+import { describe, expect, it } from "vitest";
+import { dayKeyOf, resolvePeriod } from "./period.js";
+
+describe("dayKeyOf", () => {
+ it("formats a local YYYY-MM-DD key", () => {
+ // Build a local-midnight timestamp so the key is timezone-stable.
+ const ts = new Date(2026, 5, 10).getTime();
+ expect(dayKeyOf(ts)).toBe("2026-06-10");
+ });
+});
+
+describe("resolvePeriod day", () => {
+ it("spans a single local day", () => {
+ const r = resolvePeriod("day", "2026-06-10");
+ expect(r.ok).toBe(true);
+ if (!r.ok) return;
+ expect(r.dayKeys).toEqual(["2026-06-10"]);
+ expect(r.start).toBe(new Date(2026, 5, 10).getTime());
+ expect(r.end).toBe(new Date(2026, 5, 11).getTime());
+ });
+
+ it("rejects malformed / impossible dates", () => {
+ expect(resolvePeriod("day", "2026-13-01").ok).toBe(false);
+ expect(resolvePeriod("day", "2026-02-30").ok).toBe(false);
+ expect(resolvePeriod("day", "nope").ok).toBe(false);
+ expect(resolvePeriod("day", "2026-06").ok).toBe(false);
+ });
+});
+
+describe("resolvePeriod week", () => {
+ it("spans the Monday–Sunday ISO week containing the date", () => {
+ const r = resolvePeriod("week", "2026-06-10");
+ expect(r.ok).toBe(true);
+ if (!r.ok) return;
+ expect(r.dayKeys).toHaveLength(7);
+ // start is a Monday (local)
+ expect(new Date(r.start).getDay()).toBe(1);
+ // the queried date falls within the week
+ expect(r.dayKeys).toContain("2026-06-10");
+ // end is exactly 7 local days after start
+ expect(new Date(r.end).getDay()).toBe(1);
+ });
+});
+
+describe("resolvePeriod month", () => {
+ it("spans a full calendar month", () => {
+ const r = resolvePeriod("month", "2026-06");
+ expect(r.ok).toBe(true);
+ if (!r.ok) return;
+ expect(r.dayKeys).toHaveLength(30); // June has 30 days
+ expect(r.dayKeys[0]).toBe("2026-06-01");
+ expect(r.dayKeys[29]).toBe("2026-06-30");
+ expect(r.start).toBe(new Date(2026, 5, 1).getTime());
+ expect(r.end).toBe(new Date(2026, 6, 1).getTime());
+ });
+
+ it("handles February length", () => {
+ const r = resolvePeriod("month", "2026-02");
+ expect(r.ok).toBe(true);
+ if (!r.ok) return;
+ expect(r.dayKeys).toHaveLength(28);
+ });
+
+ it("rejects a YYYY-MM-DD date for month", () => {
+ expect(resolvePeriod("month", "2026-06-10").ok).toBe(false);
+ });
+});
diff --git a/packages/throughput-store/src/period.ts b/packages/throughput-store/src/period.ts
new file mode 100644
index 0000000..d8225f8
--- /dev/null
+++ b/packages/throughput-store/src/period.ts
@@ -0,0 +1,113 @@
+/**
+ * Pure period math for throughput aggregation.
+ *
+ * All boundaries are computed in the SERVER'S LOCAL timezone (a single-operator
+ * self-hosted assumption):
+ * - day: `YYYY-MM-DD` → [local midnight, next local midnight)
+ * - week: `YYYY-MM-DD` → the ISO week (Mon–Sun) CONTAINING that date
+ * - month: `YYYY-MM` → [first of month, first of next month)
+ *
+ * A resolved period yields the half-open `[start, end)` epoch-ms range plus the
+ * list of local `YYYY-MM-DD` day keys it spans (used to address the per-day
+ * sample buckets in storage).
+ */
+
+export type Period = "day" | "week" | "month";
+
+export interface ResolvedPeriod {
+ readonly ok: true;
+ /** Inclusive start, epoch-ms (local midnight). */
+ readonly start: number;
+ /** Exclusive end, epoch-ms (local midnight). */
+ readonly end: number;
+ /** Local `YYYY-MM-DD` day keys this period spans, in order. */
+ readonly dayKeys: readonly string[];
+ /** The normalized input date string. */
+ readonly date: string;
+}
+
+export interface PeriodError {
+ readonly ok: false;
+ readonly error: string;
+}
+
+function pad2(n: number): string {
+ return n < 10 ? `0${n}` : String(n);
+}
+
+/** Local `YYYY-MM-DD` key for an epoch-ms timestamp. */
+export function dayKeyOf(ts: number): string {
+ const d = new Date(ts);
+ return `${d.getFullYear()}-${pad2(d.getMonth() + 1)}-${pad2(d.getDate())}`;
+}
+
+/** Local `YYYY-MM-DD` key for a local calendar date. */
+function dayKey(year: number, monthIndex: number, day: number): string {
+ const d = new Date(year, monthIndex, day);
+ return `${d.getFullYear()}-${pad2(d.getMonth() + 1)}-${pad2(d.getDate())}`;
+}
+
+function parseYmd(date: string): { y: number; m: number; d: number } | null {
+ if (!/^\d{4}-\d{2}-\d{2}$/.test(date)) return null;
+ const [y, m, d] = date.split("-").map(Number) as [number, number, number];
+ if (m < 1 || m > 12 || d < 1 || d > 31) return null;
+ // Reject impossible dates (e.g. 2026-02-30) by round-tripping through Date.
+ const probe = new Date(y, m - 1, d);
+ if (probe.getFullYear() !== y || probe.getMonth() !== m - 1 || probe.getDate() !== d) {
+ return null;
+ }
+ return { y, m, d };
+}
+
+function parseYm(date: string): { y: number; m: number } | null {
+ if (!/^\d{4}-\d{2}$/.test(date)) return null;
+ const [y, m] = date.split("-").map(Number) as [number, number];
+ if (m < 1 || m > 12) return null;
+ return { y, m };
+}
+
+/**
+ * Resolve a `(period, date)` pair into a concrete `[start, end)` range + the day
+ * keys it covers. Returns a `PeriodError` for malformed input.
+ */
+export function resolvePeriod(period: Period, date: string): ResolvedPeriod | PeriodError {
+ if (period === "day") {
+ const p = parseYmd(date);
+ if (!p) return { ok: false, error: `invalid day date "${date}" (expected YYYY-MM-DD)` };
+ const start = new Date(p.y, p.m - 1, p.d).getTime();
+ const end = new Date(p.y, p.m - 1, p.d + 1).getTime();
+ return { ok: true, start, end, dayKeys: [dayKey(p.y, p.m - 1, p.d)], date };
+ }
+
+ if (period === "week") {
+ const p = parseYmd(date);
+ if (!p) return { ok: false, error: `invalid week date "${date}" (expected YYYY-MM-DD)` };
+ // ISO week: Monday-based. JS getDay() is 0=Sun..6=Sat.
+ const base = new Date(p.y, p.m - 1, p.d);
+ const offset = (base.getDay() + 6) % 7; // days since Monday
+ const monStart = new Date(p.y, p.m - 1, p.d - offset);
+ const start = monStart.getTime();
+ const end = new Date(
+ monStart.getFullYear(),
+ monStart.getMonth(),
+ monStart.getDate() + 7,
+ ).getTime();
+ const dayKeys: string[] = [];
+ for (let i = 0; i < 7; i++) {
+ dayKeys.push(dayKey(monStart.getFullYear(), monStart.getMonth(), monStart.getDate() + i));
+ }
+ return { ok: true, start, end, dayKeys, date };
+ }
+
+ // month
+ const p = parseYm(date);
+ if (!p) return { ok: false, error: `invalid month date "${date}" (expected YYYY-MM)` };
+ const start = new Date(p.y, p.m - 1, 1).getTime();
+ const end = new Date(p.y, p.m, 1).getTime();
+ const lastDay = new Date(p.y, p.m, 0).getDate();
+ const dayKeys: string[] = [];
+ for (let d = 1; d <= lastDay; d++) {
+ dayKeys.push(dayKey(p.y, p.m - 1, d));
+ }
+ return { ok: true, start, end, dayKeys, date };
+}
diff --git a/packages/throughput-store/src/service.ts b/packages/throughput-store/src/service.ts
new file mode 100644
index 0000000..d3a2184
--- /dev/null
+++ b/packages/throughput-store/src/service.ts
@@ -0,0 +1,5 @@
+import { defineService } from "@dispatch/kernel";
+import type { ThroughputStore } from "./store.js";
+
+/** Typed service handle for the throughput store (record + aggregate). */
+export const throughputStoreHandle = defineService<ThroughputStore>("throughput-store/store");
diff --git a/packages/throughput-store/src/store.test.ts b/packages/throughput-store/src/store.test.ts
new file mode 100644
index 0000000..e81201a
--- /dev/null
+++ b/packages/throughput-store/src/store.test.ts
@@ -0,0 +1,72 @@
+import type { StorageNamespace } from "@dispatch/kernel";
+import { describe, expect, it } from "vitest";
+import { dayKeyOf } from "./period.js";
+import { createThroughputStore, ThroughputQueryError } from "./store.js";
+
+function memStorage(): StorageNamespace {
+ const map = new Map<string, string>();
+ return {
+ get: async (k) => map.get(k) ?? null,
+ set: async (k, v) => {
+ map.set(k, v);
+ },
+ delete: async (k) => {
+ map.delete(k);
+ },
+ has: async (k) => map.has(k),
+ keys: async (prefix) =>
+ [...map.keys()].filter((k) => (prefix === undefined ? true : k.startsWith(prefix))),
+ };
+}
+
+let id = 0;
+const store = () => createThroughputStore({ storage: memStorage(), newId: () => `id${id++}` });
+
+describe("ThroughputStore", () => {
+ it("records a sample and aggregates it for that day", async () => {
+ const s = store();
+ const ts = new Date(2026, 5, 10, 12, 0, 0).getTime();
+ await s.record({ model: "claude/haiku", ts, outputTokens: 300, genMs: 1500 });
+
+ const report = await s.aggregate({ period: "day", date: dayKeyOf(ts) });
+ expect(report.models).toHaveLength(1);
+ expect(report.models[0]).toMatchObject({
+ model: "claude/haiku",
+ totalOutputTokens: 300,
+ totalGenMs: 1500,
+ turns: 1,
+ tokensPerSecond: 200, // 300 / 1.5s
+ });
+ });
+
+ it("does not lose concurrent samples (single-set writes)", async () => {
+ const s = store();
+ const ts = new Date(2026, 5, 10, 9, 0, 0).getTime();
+ await Promise.all([
+ s.record({ model: "m", ts, outputTokens: 100, genMs: 1000 }),
+ s.record({ model: "m", ts, outputTokens: 100, genMs: 1000 }),
+ s.record({ model: "m", ts, outputTokens: 100, genMs: 1000 }),
+ ]);
+ const report = await s.aggregate({ period: "day", date: dayKeyOf(ts) });
+ expect(report.models[0]?.turns).toBe(3);
+ expect(report.models[0]?.totalOutputTokens).toBe(300);
+ });
+
+ it("aggregates multiple days within a week", async () => {
+ const s = store();
+ const mon = new Date(2026, 5, 8, 10, 0, 0).getTime(); // Mon 2026-06-08
+ const wed = new Date(2026, 5, 10, 10, 0, 0).getTime();
+ await s.record({ model: "m", ts: mon, outputTokens: 100, genMs: 1000 });
+ await s.record({ model: "m", ts: wed, outputTokens: 200, genMs: 1000 });
+
+ const report = await s.aggregate({ period: "week", date: dayKeyOf(wed) });
+ expect(report.models[0]?.turns).toBe(2);
+ expect(report.models[0]?.totalOutputTokens).toBe(300);
+ });
+
+ it("throws ThroughputQueryError on a malformed date", async () => {
+ await expect(store().aggregate({ period: "day", date: "garbage" })).rejects.toBeInstanceOf(
+ ThroughputQueryError,
+ );
+ });
+});
diff --git a/packages/throughput-store/src/store.ts b/packages/throughput-store/src/store.ts
new file mode 100644
index 0000000..94675b1
--- /dev/null
+++ b/packages/throughput-store/src/store.ts
@@ -0,0 +1,83 @@
+import type { Logger, StorageNamespace } from "@dispatch/kernel";
+import { aggregateSamples, type ModelThroughput, type ThroughputSample } from "./aggregate.js";
+import { dayKeyOf, type Period, resolvePeriod } from "./period.js";
+
+export interface ThroughputReport {
+ readonly period: Period;
+ readonly date: string;
+ /** Inclusive start, epoch-ms. */
+ readonly start: number;
+ /** Exclusive end, epoch-ms. */
+ readonly end: number;
+ readonly models: readonly ModelThroughput[];
+}
+
+export interface ThroughputQuery {
+ readonly period: Period;
+ readonly date: string;
+}
+
+/**
+ * Cross-conversation, per-model throughput store. Records one sample per
+ * completed turn and aggregates token-weighted tok/s over a day/week/month.
+ * Persistence is the injected KV namespace; samples are bucketed by local day
+ * key so a period query addresses only its own day buckets.
+ */
+export interface ThroughputStore {
+ readonly record: (sample: ThroughputSample) => Promise<void>;
+ readonly aggregate: (query: ThroughputQuery) => Promise<ThroughputReport>;
+}
+
+export interface ThroughputStoreDeps {
+ readonly storage: StorageNamespace;
+ readonly logger?: Logger;
+ /** Injectable unique-id generator (default crypto.randomUUID). */
+ readonly newId?: () => string;
+}
+
+/** Thrown when a query's `(period, date)` is malformed. */
+export class ThroughputQueryError extends Error {}
+
+const SAMPLE_PREFIX = "sample";
+
+export function createThroughputStore(deps: ThroughputStoreDeps): ThroughputStore {
+ const newId = deps.newId ?? (() => crypto.randomUUID());
+
+ return {
+ async record(sample) {
+ // Per-sample key under the local-day bucket → write is a single set
+ // (no read-modify-write, so concurrent turns can't lose a sample).
+ const day = dayKeyOf(sample.ts);
+ const key = `${SAMPLE_PREFIX}:${day}:${sample.ts}:${newId()}`;
+ await deps.storage.set(key, JSON.stringify(sample));
+ },
+
+ async aggregate(query) {
+ const resolved = resolvePeriod(query.period, query.date);
+ if (!resolved.ok) throw new ThroughputQueryError(resolved.error);
+
+ const samples: ThroughputSample[] = [];
+ for (const day of resolved.dayKeys) {
+ const keys = await deps.storage.keys(`${SAMPLE_PREFIX}:${day}:`);
+ for (const k of keys) {
+ const raw = await deps.storage.get(k);
+ if (raw === null) continue;
+ try {
+ samples.push(JSON.parse(raw) as ThroughputSample);
+ } catch {
+ // Skip a malformed row rather than failing the whole query.
+ }
+ }
+ }
+
+ const models = aggregateSamples(samples, resolved.start, resolved.end);
+ return {
+ period: query.period,
+ date: resolved.date,
+ start: resolved.start,
+ end: resolved.end,
+ models,
+ };
+ },
+ };
+}
diff --git a/packages/throughput-store/tsconfig.json b/packages/throughput-store/tsconfig.json
new file mode 100644
index 0000000..ff99a43
--- /dev/null
+++ b/packages/throughput-store/tsconfig.json
@@ -0,0 +1,6 @@
+{
+ "extends": "../../tsconfig.base.json",
+ "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true },
+ "include": ["src/**/*.ts"],
+ "references": [{ "path": "../kernel" }]
+}
diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts
index bc01ca6..5b59a2d 100644
--- a/packages/transport-contract/src/index.ts
+++ b/packages/transport-contract/src/index.ts
@@ -111,6 +111,49 @@ export interface ConversationMetricsResponse {
readonly turns: readonly TurnMetrics[];
}
+/** The aggregation window for `GET /metrics/throughput`. */
+export type ThroughputPeriod = "day" | "week" | "month";
+
+/**
+ * One model's throughput over a period. `tokensPerSecond` is the TOKEN-WEIGHTED
+ * average — `Σ(output tokens) / Σ(generation seconds)` across the period's
+ * turns — so larger turns count proportionally more than smaller ones.
+ * Generation time is the model's pure decode time (it excludes tool-execution
+ * waits).
+ */
+export interface ThroughputModelStat {
+ /** The model name in `<credentialName>/<model>` form (as selected). */
+ readonly model: string;
+ /** Token-weighted average tokens/second over the period. */
+ readonly tokensPerSecond: number;
+ /** Total output tokens generated across the period's turns. */
+ readonly totalOutputTokens: number;
+ /** Total pure generation time across the period's turns, in milliseconds. */
+ readonly totalGenMs: number;
+ /** Number of turns that contributed. */
+ readonly turns: number;
+}
+
+/**
+ * Response body for
+ * `GET /metrics/throughput?period=day|week|month&date=<...>`.
+ *
+ * `date` is `YYYY-MM-DD` for day/week (week = the ISO Mon–Sun week containing
+ * that date) and `YYYY-MM` for month. Boundaries are computed in the server's
+ * local timezone; `start`/`end` are the resolved half-open `[start, end)` range
+ * in epoch-ms. `models` lists every model active in the window, sorted by
+ * `tokensPerSecond` descending.
+ */
+export interface ThroughputResponse {
+ readonly period: ThroughputPeriod;
+ readonly date: string;
+ /** Inclusive start of the window, epoch-ms. */
+ readonly start: number;
+ /** Exclusive end of the window, epoch-ms. */
+ readonly end: number;
+ readonly models: readonly ThroughputModelStat[];
+}
+
// ─── WebSocket chat ops ───────────────────────────────────────────────────────
// The persistent WS connection multiplexes chat ops (below) with surface ops
// (`@dispatch/ui-contract`). The unified unions at the bottom compose both. Chat
diff --git a/packages/transport-http/package.json b/packages/transport-http/package.json
index 5b6846f..d439fe4 100644
--- a/packages/transport-http/package.json
+++ b/packages/transport-http/package.json
@@ -10,6 +10,7 @@
"@dispatch/credential-store": "workspace:*",
"@dispatch/kernel": "workspace:*",
"@dispatch/session-orchestrator": "workspace:*",
+ "@dispatch/throughput-store": "workspace:*",
"@dispatch/transport-contract": "workspace:*",
"hono": "^4.0.0"
}
diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts
index 0a6c5b0..e634e19 100644
--- a/packages/transport-http/src/app.test.ts
+++ b/packages/transport-http/src/app.test.ts
@@ -1,8 +1,33 @@
-import type { AgentEvent, Logger, StepId, StoredChunk, TurnMetrics } from "@dispatch/kernel";
+import type {
+ AgentEvent,
+ Logger,
+ StepId,
+ StorageNamespace,
+ StoredChunk,
+ TurnMetrics,
+} from "@dispatch/kernel";
+import { createThroughputStore, dayKeyOf } from "@dispatch/throughput-store";
+import type { ThroughputResponse } from "@dispatch/transport-contract";
import { describe, expect, it } from "vitest";
import { createApp } from "./app.js";
import type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js";
+function createMemStorage(): StorageNamespace {
+ const map = new Map<string, string>();
+ return {
+ get: async (k) => map.get(k) ?? null,
+ set: async (k, v) => {
+ map.set(k, v);
+ },
+ delete: async (k) => {
+ map.delete(k);
+ },
+ has: async (k) => map.has(k),
+ keys: async (prefix) =>
+ [...map.keys()].filter((k) => (prefix === undefined ? true : k.startsWith(prefix))),
+ };
+}
+
interface CapturedLog {
readonly level: "debug" | "info" | "warn" | "error";
readonly msg: string;
@@ -739,3 +764,114 @@ describe("CORS", () => {
expect(res.headers.get("Access-Control-Allow-Headers")).toContain("Content-Type");
});
});
+
+describe("throughput recording + GET /metrics/throughput", () => {
+ const ts = new Date(2026, 5, 10, 12, 0, 0).getTime();
+ const day = dayKeyOf(ts);
+
+ function appWith(
+ throughputStore: ReturnType<typeof createThroughputStore>,
+ events: AgentEvent[],
+ ) {
+ return createApp({
+ conversationStore: createFakeConversationStore(),
+ orchestrator: createFakeOrchestrator(events),
+ credentialStore: createFakeCredentialStore([]),
+ throughputStore,
+ now: () => ts,
+ });
+ }
+
+ async function postChat(app: ReturnType<typeof createApp>, body: Record<string, unknown>) {
+ return app.request("/chat", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify(body),
+ });
+ }
+
+ it("records a per-model sample from a turn and aggregates it (token-weighted tok/s)", async () => {
+ const store = createThroughputStore({ storage: createMemStorage() });
+ const events: AgentEvent[] = [
+ {
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: "t1#0" as StepId,
+ genTotalMs: 2000,
+ },
+ {
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ usage: { inputTokens: 10, outputTokens: 400 },
+ },
+ ];
+ const app = appWith(store, events);
+
+ const chat = await postChat(app, {
+ conversationId: "c1",
+ message: "hi",
+ model: "claude/haiku",
+ });
+ expect(chat.status).toBe(200);
+
+ const res = await app.request(`/metrics/throughput?period=day&date=${day}`);
+ expect(res.status).toBe(200);
+ const report = (await res.json()) as ThroughputResponse;
+ expect(report.period).toBe("day");
+ expect(report.models).toHaveLength(1);
+ expect(report.models[0]).toMatchObject({
+ model: "claude/haiku",
+ totalOutputTokens: 400,
+ totalGenMs: 2000,
+ tokensPerSecond: 200, // 400 tokens / 2s
+ turns: 1,
+ });
+ });
+
+ it("does not record a sample when no model is selected", async () => {
+ const store = createThroughputStore({ storage: createMemStorage() });
+ const events: AgentEvent[] = [
+ {
+ type: "step-complete",
+ conversationId: "c1",
+ turnId: "t1",
+ stepId: "t1#0" as StepId,
+ genTotalMs: 2000,
+ },
+ {
+ type: "done",
+ conversationId: "c1",
+ turnId: "t1",
+ reason: "stop",
+ usage: { inputTokens: 1, outputTokens: 5 },
+ },
+ ];
+ const app = appWith(store, events);
+
+ await postChat(app, { conversationId: "c1", message: "hi" }); // no model
+ const res = await app.request(`/metrics/throughput?period=day&date=${day}`);
+ const report = (await res.json()) as { models: unknown[] };
+ expect(report.models).toEqual([]);
+ });
+
+ it("returns 400 for an invalid period", async () => {
+ const app = appWith(createThroughputStore({ storage: createMemStorage() }), []);
+ const res = await app.request("/metrics/throughput?period=year&date=2026");
+ expect(res.status).toBe(400);
+ });
+
+ it("returns 400 for a malformed date", async () => {
+ const app = appWith(createThroughputStore({ storage: createMemStorage() }), []);
+ const res = await app.request("/metrics/throughput?period=day&date=nope");
+ expect(res.status).toBe(400);
+ });
+
+ it("returns 400 when date is missing", async () => {
+ const app = appWith(createThroughputStore({ storage: createMemStorage() }), []);
+ const res = await app.request("/metrics/throughput?period=day");
+ expect(res.status).toBe(400);
+ });
+});
diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts
index 4002e23..3c9ae85 100644
--- a/packages/transport-http/src/app.ts
+++ b/packages/transport-http/src/app.ts
@@ -3,6 +3,7 @@ import type {
ConversationHistoryResponse,
ConversationMetricsResponse,
ModelsResponse,
+ ThroughputResponse,
} from "@dispatch/transport-contract";
import { Hono } from "hono";
import { cors } from "hono/cors";
@@ -13,14 +14,24 @@ import {
parseSinceSeq,
serializeEventLine,
} from "./logic.js";
-import type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js";
+import {
+ type ConversationStore,
+ type CredentialStore,
+ type SessionOrchestrator,
+ ThroughputQueryError,
+ type ThroughputStore,
+} from "./seam.js";
export interface CreateServerOptions {
readonly conversationStore: ConversationStore;
readonly orchestrator: SessionOrchestrator;
readonly credentialStore: CredentialStore;
+ /** Optional — defaults to a no-op store (recording disabled, empty reports). */
+ readonly throughputStore?: ThroughputStore;
readonly logger?: Logger;
readonly generateId?: () => string;
+ /** Injectable clock for sample timestamps (default Date.now). */
+ readonly now?: () => number;
}
const noopLogger: Logger = {
@@ -45,10 +56,44 @@ const noopLogger: Logger = {
},
};
+const noopThroughputStore: ThroughputStore = {
+ record: async () => {},
+ aggregate: async (q) => ({ period: q.period, date: q.date, start: 0, end: 0, models: [] }),
+};
+
export function createApp(opts: CreateServerOptions): Hono {
const app = new Hono();
const log = opts.logger ?? noopLogger;
const generateId = opts.generateId ?? (() => crypto.randomUUID());
+ const now = opts.now ?? (() => Date.now());
+ const throughputStore = opts.throughputStore ?? noopThroughputStore;
+
+ async function recordThroughput(
+ turnEvents: readonly AgentEvent[],
+ model: string | undefined,
+ ): Promise<void> {
+ if (model === undefined) return; // no model selected → nothing to attribute
+ let genMs = 0;
+ let outputTokens = 0;
+ for (const e of turnEvents) {
+ if (e.type === "step-complete" && e.genTotalMs !== undefined) genMs += e.genTotalMs;
+ if (e.type === "done" && e.usage !== undefined) outputTokens = e.usage.outputTokens;
+ }
+ if (genMs <= 0) return; // no generation time → can't compute tok/s
+ try {
+ await throughputStore.record({ model, ts: now(), outputTokens, genMs });
+ log.info("throughput: turn recorded", {
+ model,
+ outputTokens,
+ genMs,
+ tokensPerSecond: Math.round((outputTokens / (genMs / 1000)) * 100) / 100,
+ });
+ } catch (err) {
+ log.warn("throughput: failed to record sample", {
+ error: err instanceof Error ? err.message : String(err),
+ });
+ }
+ }
app.use(
"*",
@@ -174,6 +219,11 @@ export function createApp(opts: CreateServerOptions): Hono {
await streamReady;
await orchestratorPromise.catch(() => {});
+ // Record a per-model throughput sample for this turn. Generation time is
+ // the PURE decode time — the sum of per-step genTotalMs (excludes tool
+ // waits) — and tokens are the turn's aggregate output tokens.
+ await recordThroughput(events, model);
+
const ndjson = events.map(serializeEventLine).join("");
return c.text(ndjson, 200, {
@@ -182,5 +232,28 @@ export function createApp(opts: CreateServerOptions): Hono {
});
});
+ app.get("/metrics/throughput", async (c) => {
+ const period = c.req.query("period");
+ const date = c.req.query("date");
+ if (period !== "day" && period !== "week" && period !== "month") {
+ return c.json({ error: "query param 'period' must be one of: day, week, month" }, 400);
+ }
+ if (date === undefined || date === "") {
+ return c.json({ error: "query param 'date' is required" }, 400);
+ }
+ try {
+ // Typed against the wire contract: if the store's report shape ever
+ // drifts from ThroughputResponse, this assignment fails to compile.
+ const body: ThroughputResponse = await throughputStore.aggregate({ period, date });
+ return c.json(body);
+ } catch (err) {
+ if (err instanceof ThroughputQueryError) {
+ return c.json({ error: err.message }, 400);
+ }
+ log.error("throughput: aggregate failed", { err });
+ return c.json({ error: "Failed to aggregate throughput" }, 502);
+ }
+ });
+
return app;
}
diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts
index adbf87f..4abd7aa 100644
--- a/packages/transport-http/src/extension.ts
+++ b/packages/transport-http/src/extension.ts
@@ -4,6 +4,7 @@ import {
conversationStoreHandle,
credentialStoreHandle,
sessionOrchestratorHandle,
+ throughputStoreHandle,
} from "./seam.js";
export const manifest: Manifest = {
@@ -12,9 +13,11 @@ export const manifest: Manifest = {
version: "0.0.0",
apiVersion: "^0.1.0",
trust: "bundled",
- dependsOn: ["conversation-store", "credential-store", "session-orchestrator"],
+ dependsOn: ["conversation-store", "credential-store", "session-orchestrator", "throughput-store"],
capabilities: { network: true },
- contributes: { routes: ["/chat", "/conversations/:id", "/health", "/models"] },
+ contributes: {
+ routes: ["/chat", "/conversations/:id", "/health", "/models", "/metrics/throughput"],
+ },
activation: "eager",
};
@@ -32,12 +35,14 @@ export function createTransportHttpExtension(): Extension & {
const conversationStore = host.getService(conversationStoreHandle);
const orchestrator = host.getService(sessionOrchestratorHandle);
const credentialStore = host.getService(credentialStoreHandle);
+ const throughputStore = host.getService(throughputStoreHandle);
const logger = host.logger;
const app = createApp({
conversationStore,
orchestrator,
credentialStore,
+ throughputStore,
logger,
});
diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts
index c7bfb74..7dbaa1b 100644
--- a/packages/transport-http/src/seam.ts
+++ b/packages/transport-http/src/seam.ts
@@ -4,3 +4,5 @@ export type { CredentialStore } from "@dispatch/credential-store";
export { credentialStoreHandle } from "@dispatch/credential-store";
export type { SessionOrchestrator } from "@dispatch/session-orchestrator";
export { sessionOrchestratorHandle } from "@dispatch/session-orchestrator";
+export type { ThroughputStore } from "@dispatch/throughput-store";
+export { ThroughputQueryError, throughputStoreHandle } from "@dispatch/throughput-store";
diff --git a/packages/transport-http/tsconfig.json b/packages/transport-http/tsconfig.json
index bcbf86a..fc29c8c 100644
--- a/packages/transport-http/tsconfig.json
+++ b/packages/transport-http/tsconfig.json
@@ -7,6 +7,7 @@
{ "path": "../credential-store" },
{ "path": "../kernel" },
{ "path": "../session-orchestrator" },
+ { "path": "../throughput-store" },
{ "path": "../transport-contract" }
]
}
diff --git a/tsconfig.json b/tsconfig.json
index 58ec820..68d373c 100644
--- a/tsconfig.json
+++ b/tsconfig.json
@@ -13,6 +13,7 @@
{ "path": "./packages/provider-openai-compat" },
{ "path": "./packages/credential-store" },
{ "path": "./packages/conversation-store" },
+ { "path": "./packages/throughput-store" },
{ "path": "./packages/session-orchestrator" },
{ "path": "./packages/transport-http" },
{ "path": "./packages/tool-read-file" },