diff options
| author | Adam Malczewski <[email protected]> | 2026-06-10 10:16:20 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-10 10:16:20 +0900 |
| commit | 80e14ab59732aabbf06035d13138500f133e921d (patch) | |
| tree | be3c1c118ceeb207d7ff218aa5d7ace06e683710 | |
| parent | 5ff460688519e48fd0bfab893ebaed4258dee789 (diff) | |
| download | dispatch-80e14ab59732aabbf06035d13138500f133e921d.tar.gz dispatch-80e14ab59732aabbf06035d13138500f133e921d.zip | |
feat: per-model throughput (tok/s) tracking + metrics endpoint
New throughput-store extension records one token-weighted sample per turn
(model, output tokens, pure generation time = Σ step genTotalMs) into a
day-bucketed KV store, and aggregates per-model tok/s = Σtokens / Σgen-seconds
over a day/week/month (server-local boundaries; week = ISO Mon–Sun).
transport-http records a sample per turn (logged) and serves
GET /metrics/throughput?period=day|week|month&date=<...>. The response is typed
as transport-contract's ThroughputResponse, so store/wire drift is a compile
error. Pure period + aggregate logic fully unit-tested.
23 files changed, 802 insertions, 6 deletions
@@ -54,6 +54,7 @@ "@dispatch/storage-sqlite": "workspace:*", "@dispatch/surface-loaded-extensions": "workspace:*", "@dispatch/surface-registry": "workspace:*", + "@dispatch/throughput-store": "workspace:*", "@dispatch/tool-read-file": "workspace:*", "@dispatch/transport-http": "workspace:*", "@dispatch/transport-ws": "workspace:*", @@ -122,6 +123,13 @@ "@dispatch/ui-contract": "workspace:*", }, }, + "packages/throughput-store": { + "name": "@dispatch/throughput-store", + "version": "0.0.0", + "dependencies": { + "@dispatch/kernel": "workspace:*", + }, + }, "packages/tool-read-file": { "name": "@dispatch/tool-read-file", "version": "0.0.0", @@ -142,7 +150,7 @@ }, "packages/transport-contract": { "name": "@dispatch/transport-contract", - "version": "0.1.0", + "version": "0.4.0", "dependencies": { "@dispatch/ui-contract": "workspace:*", "@dispatch/wire": "workspace:*", @@ -156,6 +164,7 @@ "@dispatch/credential-store": "workspace:*", "@dispatch/kernel": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", + "@dispatch/throughput-store": "workspace:*", "@dispatch/transport-contract": "workspace:*", "hono": "^4.0.0", }, @@ -177,7 +186,7 @@ }, "packages/wire": { "name": "@dispatch/wire", - "version": "0.1.0", + "version": "0.4.0", }, }, "packages": { @@ -225,6 +234,8 @@ "@dispatch/surface-registry": ["@dispatch/surface-registry@workspace:packages/surface-registry"], + "@dispatch/throughput-store": ["@dispatch/throughput-store@workspace:packages/throughput-store"], + "@dispatch/tool-read-file": ["@dispatch/tool-read-file@workspace:packages/tool-read-file"], "@dispatch/trace-replay": ["@dispatch/trace-replay@workspace:packages/trace-replay"], diff --git a/packages/host-bin/package.json b/packages/host-bin/package.json index 65e986d..ccaddd2 100644 --- a/packages/host-bin/package.json +++ b/packages/host-bin/package.json @@ -11,6 +11,7 @@ "@dispatch/credential-store": "workspace:*", "@dispatch/provider-openai-compat": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", + "@dispatch/throughput-store": "workspace:*", "@dispatch/transport-http": "workspace:*", "@dispatch/tool-read-file": "workspace:*", "@dispatch/journal-sink": "workspace:*", diff --git a/packages/host-bin/src/main.ts b/packages/host-bin/src/main.ts index ec51d11..7f219d1 100644 --- a/packages/host-bin/src/main.ts +++ b/packages/host-bin/src/main.ts @@ -23,6 +23,7 @@ import { extension as sessionOrchestratorExt } from "@dispatch/session-orchestra import { createSqliteStorage, extension as storageSqliteExt } from "@dispatch/storage-sqlite"; import { createLoadedExtensionsExtension } from "@dispatch/surface-loaded-extensions"; import { createSurfaceRegistryExtension } from "@dispatch/surface-registry"; +import { extension as throughputStoreExt } from "@dispatch/throughput-store"; import { extension as toolReadFileExt } from "@dispatch/tool-read-file"; import { createTransportHttpExtension } from "@dispatch/transport-http"; import { createTransportWsExtension } from "@dispatch/transport-ws"; @@ -62,6 +63,7 @@ const CORE_EXTENSIONS: readonly Extension[] = [ authApikeyExt, providerOpenaiCompatExt, toolReadFileExt, + throughputStoreExt, sessionOrchestratorExt, createTransportHttpExtension(), // Surface extensions — dependency order: surface-registry first, then consumers. diff --git a/packages/host-bin/tsconfig.json b/packages/host-bin/tsconfig.json index 3630394..53762c7 100644 --- a/packages/host-bin/tsconfig.json +++ b/packages/host-bin/tsconfig.json @@ -8,6 +8,7 @@ { "path": "../surface-loaded-extensions" }, { "path": "../surface-registry" }, { "path": "../tool-read-file" }, + { "path": "../throughput-store" }, { "path": "../transport-http" }, { "path": "../transport-ws" } ] diff --git a/packages/throughput-store/package.json b/packages/throughput-store/package.json new file mode 100644 index 0000000..9624c7c --- /dev/null +++ b/packages/throughput-store/package.json @@ -0,0 +1,11 @@ +{ + "name": "@dispatch/throughput-store", + "version": "0.0.0", + "type": "module", + "private": true, + "main": "dist/index.js", + "types": "dist/index.d.ts", + "dependencies": { + "@dispatch/kernel": "workspace:*" + } +} diff --git a/packages/throughput-store/src/aggregate.test.ts b/packages/throughput-store/src/aggregate.test.ts new file mode 100644 index 0000000..27b800c --- /dev/null +++ b/packages/throughput-store/src/aggregate.test.ts @@ -0,0 +1,50 @@ +import { describe, expect, it } from "vitest"; +import { aggregateSamples, type ThroughputSample } from "./aggregate.js"; + +const S = (model: string, ts: number, outputTokens: number, genMs: number): ThroughputSample => ({ + model, + ts, + outputTokens, + genMs, +}); + +describe("aggregateSamples", () => { + it("token-weights tok/s (Σtokens / Σgen-seconds), so large turns dominate", () => { + const samples = [ + S("claude/haiku", 100, 1000, 10_000), // 100 tok/s, big turn + S("claude/haiku", 200, 10, 1000), // 10 tok/s, small turn + ]; + const [row] = aggregateSamples(samples, 0, 1000); + expect(row?.model).toBe("claude/haiku"); + // 1010 tokens / 11 s = 91.82, NOT the simple mean (55) + expect(row?.tokensPerSecond).toBeCloseTo(91.82, 1); + expect(row?.totalOutputTokens).toBe(1010); + expect(row?.totalGenMs).toBe(11_000); + expect(row?.turns).toBe(2); + }); + + it("excludes samples outside the [start, end) range", () => { + const samples = [S("m", 50, 100, 1000), S("m", 500, 100, 1000), S("m", 1500, 999, 1000)]; + const [row] = aggregateSamples(samples, 100, 1000); + expect(row?.turns).toBe(1); // only ts=500 is in [100, 1000) + expect(row?.totalOutputTokens).toBe(100); + }); + + it("groups by model and sorts by tok/s descending", () => { + const samples = [ + S("slow", 10, 50, 5000), // 10 tok/s + S("fast", 10, 500, 1000), // 500 tok/s + ]; + const rows = aggregateSamples(samples, 0, 100); + expect(rows.map((r) => r.model)).toEqual(["fast", "slow"]); + }); + + it("reports 0 tok/s when generation time is 0 (avoids divide-by-zero)", () => { + const [row] = aggregateSamples([S("m", 10, 100, 0)], 0, 100); + expect(row?.tokensPerSecond).toBe(0); + }); + + it("returns an empty list when no samples match", () => { + expect(aggregateSamples([], 0, 100)).toEqual([]); + }); +}); diff --git a/packages/throughput-store/src/aggregate.ts b/packages/throughput-store/src/aggregate.ts new file mode 100644 index 0000000..2437d9f --- /dev/null +++ b/packages/throughput-store/src/aggregate.ts @@ -0,0 +1,72 @@ +/** + * Pure throughput aggregation. + * + * Per model, tokens-per-second is the TOKEN-WEIGHTED average: + * + * tok/s = Σ(output_tokens) / Σ(generation_seconds) + * + * i.e. total tokens over total generation time across the period's turns. This + * makes a turn that generated more tokens count proportionally more than a small + * turn — large turns dominate, exactly as intended. Generation time is the pure + * decode time (excludes tool-execution waits). + */ + +export interface ThroughputSample { + readonly model: string; + /** Epoch-ms the turn completed. */ + readonly ts: number; + /** Output tokens generated in the turn. */ + readonly outputTokens: number; + /** Pure generation time for the turn (ms), summed across its steps. */ + readonly genMs: number; +} + +export interface ModelThroughput { + readonly model: string; + /** Token-weighted average tokens/second over the period. */ + readonly tokensPerSecond: number; + readonly totalOutputTokens: number; + readonly totalGenMs: number; + /** Number of turns that contributed. */ + readonly turns: number; +} + +/** + * Aggregate samples within the half-open range `[start, end)` into per-model + * throughput, sorted by tok/s descending (ties broken by model name). + */ +export function aggregateSamples( + samples: readonly ThroughputSample[], + start: number, + end: number, +): ModelThroughput[] { + const byModel = new Map<string, { tokens: number; genMs: number; turns: number }>(); + + for (const s of samples) { + if (s.ts < start || s.ts >= end) continue; + const acc = byModel.get(s.model) ?? { tokens: 0, genMs: 0, turns: 0 }; + acc.tokens += s.outputTokens; + acc.genMs += s.genMs; + acc.turns += 1; + byModel.set(s.model, acc); + } + + const result: ModelThroughput[] = []; + for (const [model, acc] of byModel) { + const tokensPerSecond = acc.genMs > 0 ? round2(acc.tokens / (acc.genMs / 1000)) : 0; + result.push({ + model, + tokensPerSecond, + totalOutputTokens: acc.tokens, + totalGenMs: acc.genMs, + turns: acc.turns, + }); + } + + result.sort((a, b) => b.tokensPerSecond - a.tokensPerSecond || a.model.localeCompare(b.model)); + return result; +} + +function round2(n: number): number { + return Math.round(n * 100) / 100; +} diff --git a/packages/throughput-store/src/extension.ts b/packages/throughput-store/src/extension.ts new file mode 100644 index 0000000..01d1549 --- /dev/null +++ b/packages/throughput-store/src/extension.ts @@ -0,0 +1,24 @@ +import type { Extension, HostAPI, Manifest } from "@dispatch/kernel"; +import { throughputStoreHandle } from "./service.js"; +import { createThroughputStore } from "./store.js"; + +export const manifest: Manifest = { + id: "throughput-store", + name: "Throughput Store", + version: "0.0.0", + apiVersion: "^0.1.0", + trust: "bundled", + capabilities: { db: true }, + contributes: { services: ["throughput-store/store"] }, + activation: "eager", +}; + +export const extension: Extension = { + manifest, + activate: (host: HostAPI) => { + const storage = host.storage("throughput-store"); + const store = createThroughputStore({ storage, logger: host.logger }); + host.provideService(throughputStoreHandle, store); + host.logger.info("throughput-store: registered"); + }, +}; diff --git a/packages/throughput-store/src/index.ts b/packages/throughput-store/src/index.ts new file mode 100644 index 0000000..24ebaba --- /dev/null +++ b/packages/throughput-store/src/index.ts @@ -0,0 +1,16 @@ +export { + aggregateSamples, + type ModelThroughput, + type ThroughputSample, +} from "./aggregate.js"; +export { extension, manifest } from "./extension.js"; +export { dayKeyOf, type Period, resolvePeriod } from "./period.js"; +export { throughputStoreHandle } from "./service.js"; +export { + createThroughputStore, + type ThroughputQuery, + ThroughputQueryError, + type ThroughputReport, + type ThroughputStore, + type ThroughputStoreDeps, +} from "./store.js"; diff --git a/packages/throughput-store/src/period.test.ts b/packages/throughput-store/src/period.test.ts new file mode 100644 index 0000000..b39437c --- /dev/null +++ b/packages/throughput-store/src/period.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, it } from "vitest"; +import { dayKeyOf, resolvePeriod } from "./period.js"; + +describe("dayKeyOf", () => { + it("formats a local YYYY-MM-DD key", () => { + // Build a local-midnight timestamp so the key is timezone-stable. + const ts = new Date(2026, 5, 10).getTime(); + expect(dayKeyOf(ts)).toBe("2026-06-10"); + }); +}); + +describe("resolvePeriod day", () => { + it("spans a single local day", () => { + const r = resolvePeriod("day", "2026-06-10"); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.dayKeys).toEqual(["2026-06-10"]); + expect(r.start).toBe(new Date(2026, 5, 10).getTime()); + expect(r.end).toBe(new Date(2026, 5, 11).getTime()); + }); + + it("rejects malformed / impossible dates", () => { + expect(resolvePeriod("day", "2026-13-01").ok).toBe(false); + expect(resolvePeriod("day", "2026-02-30").ok).toBe(false); + expect(resolvePeriod("day", "nope").ok).toBe(false); + expect(resolvePeriod("day", "2026-06").ok).toBe(false); + }); +}); + +describe("resolvePeriod week", () => { + it("spans the Monday–Sunday ISO week containing the date", () => { + const r = resolvePeriod("week", "2026-06-10"); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.dayKeys).toHaveLength(7); + // start is a Monday (local) + expect(new Date(r.start).getDay()).toBe(1); + // the queried date falls within the week + expect(r.dayKeys).toContain("2026-06-10"); + // end is exactly 7 local days after start + expect(new Date(r.end).getDay()).toBe(1); + }); +}); + +describe("resolvePeriod month", () => { + it("spans a full calendar month", () => { + const r = resolvePeriod("month", "2026-06"); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.dayKeys).toHaveLength(30); // June has 30 days + expect(r.dayKeys[0]).toBe("2026-06-01"); + expect(r.dayKeys[29]).toBe("2026-06-30"); + expect(r.start).toBe(new Date(2026, 5, 1).getTime()); + expect(r.end).toBe(new Date(2026, 6, 1).getTime()); + }); + + it("handles February length", () => { + const r = resolvePeriod("month", "2026-02"); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.dayKeys).toHaveLength(28); + }); + + it("rejects a YYYY-MM-DD date for month", () => { + expect(resolvePeriod("month", "2026-06-10").ok).toBe(false); + }); +}); diff --git a/packages/throughput-store/src/period.ts b/packages/throughput-store/src/period.ts new file mode 100644 index 0000000..d8225f8 --- /dev/null +++ b/packages/throughput-store/src/period.ts @@ -0,0 +1,113 @@ +/** + * Pure period math for throughput aggregation. + * + * All boundaries are computed in the SERVER'S LOCAL timezone (a single-operator + * self-hosted assumption): + * - day: `YYYY-MM-DD` → [local midnight, next local midnight) + * - week: `YYYY-MM-DD` → the ISO week (Mon–Sun) CONTAINING that date + * - month: `YYYY-MM` → [first of month, first of next month) + * + * A resolved period yields the half-open `[start, end)` epoch-ms range plus the + * list of local `YYYY-MM-DD` day keys it spans (used to address the per-day + * sample buckets in storage). + */ + +export type Period = "day" | "week" | "month"; + +export interface ResolvedPeriod { + readonly ok: true; + /** Inclusive start, epoch-ms (local midnight). */ + readonly start: number; + /** Exclusive end, epoch-ms (local midnight). */ + readonly end: number; + /** Local `YYYY-MM-DD` day keys this period spans, in order. */ + readonly dayKeys: readonly string[]; + /** The normalized input date string. */ + readonly date: string; +} + +export interface PeriodError { + readonly ok: false; + readonly error: string; +} + +function pad2(n: number): string { + return n < 10 ? `0${n}` : String(n); +} + +/** Local `YYYY-MM-DD` key for an epoch-ms timestamp. */ +export function dayKeyOf(ts: number): string { + const d = new Date(ts); + return `${d.getFullYear()}-${pad2(d.getMonth() + 1)}-${pad2(d.getDate())}`; +} + +/** Local `YYYY-MM-DD` key for a local calendar date. */ +function dayKey(year: number, monthIndex: number, day: number): string { + const d = new Date(year, monthIndex, day); + return `${d.getFullYear()}-${pad2(d.getMonth() + 1)}-${pad2(d.getDate())}`; +} + +function parseYmd(date: string): { y: number; m: number; d: number } | null { + if (!/^\d{4}-\d{2}-\d{2}$/.test(date)) return null; + const [y, m, d] = date.split("-").map(Number) as [number, number, number]; + if (m < 1 || m > 12 || d < 1 || d > 31) return null; + // Reject impossible dates (e.g. 2026-02-30) by round-tripping through Date. + const probe = new Date(y, m - 1, d); + if (probe.getFullYear() !== y || probe.getMonth() !== m - 1 || probe.getDate() !== d) { + return null; + } + return { y, m, d }; +} + +function parseYm(date: string): { y: number; m: number } | null { + if (!/^\d{4}-\d{2}$/.test(date)) return null; + const [y, m] = date.split("-").map(Number) as [number, number]; + if (m < 1 || m > 12) return null; + return { y, m }; +} + +/** + * Resolve a `(period, date)` pair into a concrete `[start, end)` range + the day + * keys it covers. Returns a `PeriodError` for malformed input. + */ +export function resolvePeriod(period: Period, date: string): ResolvedPeriod | PeriodError { + if (period === "day") { + const p = parseYmd(date); + if (!p) return { ok: false, error: `invalid day date "${date}" (expected YYYY-MM-DD)` }; + const start = new Date(p.y, p.m - 1, p.d).getTime(); + const end = new Date(p.y, p.m - 1, p.d + 1).getTime(); + return { ok: true, start, end, dayKeys: [dayKey(p.y, p.m - 1, p.d)], date }; + } + + if (period === "week") { + const p = parseYmd(date); + if (!p) return { ok: false, error: `invalid week date "${date}" (expected YYYY-MM-DD)` }; + // ISO week: Monday-based. JS getDay() is 0=Sun..6=Sat. + const base = new Date(p.y, p.m - 1, p.d); + const offset = (base.getDay() + 6) % 7; // days since Monday + const monStart = new Date(p.y, p.m - 1, p.d - offset); + const start = monStart.getTime(); + const end = new Date( + monStart.getFullYear(), + monStart.getMonth(), + monStart.getDate() + 7, + ).getTime(); + const dayKeys: string[] = []; + for (let i = 0; i < 7; i++) { + dayKeys.push(dayKey(monStart.getFullYear(), monStart.getMonth(), monStart.getDate() + i)); + } + return { ok: true, start, end, dayKeys, date }; + } + + // month + const p = parseYm(date); + if (!p) return { ok: false, error: `invalid month date "${date}" (expected YYYY-MM)` }; + const start = new Date(p.y, p.m - 1, 1).getTime(); + const end = new Date(p.y, p.m, 1).getTime(); + const lastDay = new Date(p.y, p.m, 0).getDate(); + const dayKeys: string[] = []; + for (let d = 1; d <= lastDay; d++) { + dayKeys.push(dayKey(p.y, p.m - 1, d)); + } + return { ok: true, start, end, dayKeys, date }; +} diff --git a/packages/throughput-store/src/service.ts b/packages/throughput-store/src/service.ts new file mode 100644 index 0000000..d3a2184 --- /dev/null +++ b/packages/throughput-store/src/service.ts @@ -0,0 +1,5 @@ +import { defineService } from "@dispatch/kernel"; +import type { ThroughputStore } from "./store.js"; + +/** Typed service handle for the throughput store (record + aggregate). */ +export const throughputStoreHandle = defineService<ThroughputStore>("throughput-store/store"); diff --git a/packages/throughput-store/src/store.test.ts b/packages/throughput-store/src/store.test.ts new file mode 100644 index 0000000..e81201a --- /dev/null +++ b/packages/throughput-store/src/store.test.ts @@ -0,0 +1,72 @@ +import type { StorageNamespace } from "@dispatch/kernel"; +import { describe, expect, it } from "vitest"; +import { dayKeyOf } from "./period.js"; +import { createThroughputStore, ThroughputQueryError } from "./store.js"; + +function memStorage(): StorageNamespace { + const map = new Map<string, string>(); + return { + get: async (k) => map.get(k) ?? null, + set: async (k, v) => { + map.set(k, v); + }, + delete: async (k) => { + map.delete(k); + }, + has: async (k) => map.has(k), + keys: async (prefix) => + [...map.keys()].filter((k) => (prefix === undefined ? true : k.startsWith(prefix))), + }; +} + +let id = 0; +const store = () => createThroughputStore({ storage: memStorage(), newId: () => `id${id++}` }); + +describe("ThroughputStore", () => { + it("records a sample and aggregates it for that day", async () => { + const s = store(); + const ts = new Date(2026, 5, 10, 12, 0, 0).getTime(); + await s.record({ model: "claude/haiku", ts, outputTokens: 300, genMs: 1500 }); + + const report = await s.aggregate({ period: "day", date: dayKeyOf(ts) }); + expect(report.models).toHaveLength(1); + expect(report.models[0]).toMatchObject({ + model: "claude/haiku", + totalOutputTokens: 300, + totalGenMs: 1500, + turns: 1, + tokensPerSecond: 200, // 300 / 1.5s + }); + }); + + it("does not lose concurrent samples (single-set writes)", async () => { + const s = store(); + const ts = new Date(2026, 5, 10, 9, 0, 0).getTime(); + await Promise.all([ + s.record({ model: "m", ts, outputTokens: 100, genMs: 1000 }), + s.record({ model: "m", ts, outputTokens: 100, genMs: 1000 }), + s.record({ model: "m", ts, outputTokens: 100, genMs: 1000 }), + ]); + const report = await s.aggregate({ period: "day", date: dayKeyOf(ts) }); + expect(report.models[0]?.turns).toBe(3); + expect(report.models[0]?.totalOutputTokens).toBe(300); + }); + + it("aggregates multiple days within a week", async () => { + const s = store(); + const mon = new Date(2026, 5, 8, 10, 0, 0).getTime(); // Mon 2026-06-08 + const wed = new Date(2026, 5, 10, 10, 0, 0).getTime(); + await s.record({ model: "m", ts: mon, outputTokens: 100, genMs: 1000 }); + await s.record({ model: "m", ts: wed, outputTokens: 200, genMs: 1000 }); + + const report = await s.aggregate({ period: "week", date: dayKeyOf(wed) }); + expect(report.models[0]?.turns).toBe(2); + expect(report.models[0]?.totalOutputTokens).toBe(300); + }); + + it("throws ThroughputQueryError on a malformed date", async () => { + await expect(store().aggregate({ period: "day", date: "garbage" })).rejects.toBeInstanceOf( + ThroughputQueryError, + ); + }); +}); diff --git a/packages/throughput-store/src/store.ts b/packages/throughput-store/src/store.ts new file mode 100644 index 0000000..94675b1 --- /dev/null +++ b/packages/throughput-store/src/store.ts @@ -0,0 +1,83 @@ +import type { Logger, StorageNamespace } from "@dispatch/kernel"; +import { aggregateSamples, type ModelThroughput, type ThroughputSample } from "./aggregate.js"; +import { dayKeyOf, type Period, resolvePeriod } from "./period.js"; + +export interface ThroughputReport { + readonly period: Period; + readonly date: string; + /** Inclusive start, epoch-ms. */ + readonly start: number; + /** Exclusive end, epoch-ms. */ + readonly end: number; + readonly models: readonly ModelThroughput[]; +} + +export interface ThroughputQuery { + readonly period: Period; + readonly date: string; +} + +/** + * Cross-conversation, per-model throughput store. Records one sample per + * completed turn and aggregates token-weighted tok/s over a day/week/month. + * Persistence is the injected KV namespace; samples are bucketed by local day + * key so a period query addresses only its own day buckets. + */ +export interface ThroughputStore { + readonly record: (sample: ThroughputSample) => Promise<void>; + readonly aggregate: (query: ThroughputQuery) => Promise<ThroughputReport>; +} + +export interface ThroughputStoreDeps { + readonly storage: StorageNamespace; + readonly logger?: Logger; + /** Injectable unique-id generator (default crypto.randomUUID). */ + readonly newId?: () => string; +} + +/** Thrown when a query's `(period, date)` is malformed. */ +export class ThroughputQueryError extends Error {} + +const SAMPLE_PREFIX = "sample"; + +export function createThroughputStore(deps: ThroughputStoreDeps): ThroughputStore { + const newId = deps.newId ?? (() => crypto.randomUUID()); + + return { + async record(sample) { + // Per-sample key under the local-day bucket → write is a single set + // (no read-modify-write, so concurrent turns can't lose a sample). + const day = dayKeyOf(sample.ts); + const key = `${SAMPLE_PREFIX}:${day}:${sample.ts}:${newId()}`; + await deps.storage.set(key, JSON.stringify(sample)); + }, + + async aggregate(query) { + const resolved = resolvePeriod(query.period, query.date); + if (!resolved.ok) throw new ThroughputQueryError(resolved.error); + + const samples: ThroughputSample[] = []; + for (const day of resolved.dayKeys) { + const keys = await deps.storage.keys(`${SAMPLE_PREFIX}:${day}:`); + for (const k of keys) { + const raw = await deps.storage.get(k); + if (raw === null) continue; + try { + samples.push(JSON.parse(raw) as ThroughputSample); + } catch { + // Skip a malformed row rather than failing the whole query. + } + } + } + + const models = aggregateSamples(samples, resolved.start, resolved.end); + return { + period: query.period, + date: resolved.date, + start: resolved.start, + end: resolved.end, + models, + }; + }, + }; +} diff --git a/packages/throughput-store/tsconfig.json b/packages/throughput-store/tsconfig.json new file mode 100644 index 0000000..ff99a43 --- /dev/null +++ b/packages/throughput-store/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { "rootDir": "src", "outDir": "dist", "composite": true }, + "include": ["src/**/*.ts"], + "references": [{ "path": "../kernel" }] +} diff --git a/packages/transport-contract/src/index.ts b/packages/transport-contract/src/index.ts index bc01ca6..5b59a2d 100644 --- a/packages/transport-contract/src/index.ts +++ b/packages/transport-contract/src/index.ts @@ -111,6 +111,49 @@ export interface ConversationMetricsResponse { readonly turns: readonly TurnMetrics[]; } +/** The aggregation window for `GET /metrics/throughput`. */ +export type ThroughputPeriod = "day" | "week" | "month"; + +/** + * One model's throughput over a period. `tokensPerSecond` is the TOKEN-WEIGHTED + * average — `Σ(output tokens) / Σ(generation seconds)` across the period's + * turns — so larger turns count proportionally more than smaller ones. + * Generation time is the model's pure decode time (it excludes tool-execution + * waits). + */ +export interface ThroughputModelStat { + /** The model name in `<credentialName>/<model>` form (as selected). */ + readonly model: string; + /** Token-weighted average tokens/second over the period. */ + readonly tokensPerSecond: number; + /** Total output tokens generated across the period's turns. */ + readonly totalOutputTokens: number; + /** Total pure generation time across the period's turns, in milliseconds. */ + readonly totalGenMs: number; + /** Number of turns that contributed. */ + readonly turns: number; +} + +/** + * Response body for + * `GET /metrics/throughput?period=day|week|month&date=<...>`. + * + * `date` is `YYYY-MM-DD` for day/week (week = the ISO Mon–Sun week containing + * that date) and `YYYY-MM` for month. Boundaries are computed in the server's + * local timezone; `start`/`end` are the resolved half-open `[start, end)` range + * in epoch-ms. `models` lists every model active in the window, sorted by + * `tokensPerSecond` descending. + */ +export interface ThroughputResponse { + readonly period: ThroughputPeriod; + readonly date: string; + /** Inclusive start of the window, epoch-ms. */ + readonly start: number; + /** Exclusive end of the window, epoch-ms. */ + readonly end: number; + readonly models: readonly ThroughputModelStat[]; +} + // ─── WebSocket chat ops ─────────────────────────────────────────────────────── // The persistent WS connection multiplexes chat ops (below) with surface ops // (`@dispatch/ui-contract`). The unified unions at the bottom compose both. Chat diff --git a/packages/transport-http/package.json b/packages/transport-http/package.json index 5b6846f..d439fe4 100644 --- a/packages/transport-http/package.json +++ b/packages/transport-http/package.json @@ -10,6 +10,7 @@ "@dispatch/credential-store": "workspace:*", "@dispatch/kernel": "workspace:*", "@dispatch/session-orchestrator": "workspace:*", + "@dispatch/throughput-store": "workspace:*", "@dispatch/transport-contract": "workspace:*", "hono": "^4.0.0" } diff --git a/packages/transport-http/src/app.test.ts b/packages/transport-http/src/app.test.ts index 0a6c5b0..e634e19 100644 --- a/packages/transport-http/src/app.test.ts +++ b/packages/transport-http/src/app.test.ts @@ -1,8 +1,33 @@ -import type { AgentEvent, Logger, StepId, StoredChunk, TurnMetrics } from "@dispatch/kernel"; +import type { + AgentEvent, + Logger, + StepId, + StorageNamespace, + StoredChunk, + TurnMetrics, +} from "@dispatch/kernel"; +import { createThroughputStore, dayKeyOf } from "@dispatch/throughput-store"; +import type { ThroughputResponse } from "@dispatch/transport-contract"; import { describe, expect, it } from "vitest"; import { createApp } from "./app.js"; import type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js"; +function createMemStorage(): StorageNamespace { + const map = new Map<string, string>(); + return { + get: async (k) => map.get(k) ?? null, + set: async (k, v) => { + map.set(k, v); + }, + delete: async (k) => { + map.delete(k); + }, + has: async (k) => map.has(k), + keys: async (prefix) => + [...map.keys()].filter((k) => (prefix === undefined ? true : k.startsWith(prefix))), + }; +} + interface CapturedLog { readonly level: "debug" | "info" | "warn" | "error"; readonly msg: string; @@ -739,3 +764,114 @@ describe("CORS", () => { expect(res.headers.get("Access-Control-Allow-Headers")).toContain("Content-Type"); }); }); + +describe("throughput recording + GET /metrics/throughput", () => { + const ts = new Date(2026, 5, 10, 12, 0, 0).getTime(); + const day = dayKeyOf(ts); + + function appWith( + throughputStore: ReturnType<typeof createThroughputStore>, + events: AgentEvent[], + ) { + return createApp({ + conversationStore: createFakeConversationStore(), + orchestrator: createFakeOrchestrator(events), + credentialStore: createFakeCredentialStore([]), + throughputStore, + now: () => ts, + }); + } + + async function postChat(app: ReturnType<typeof createApp>, body: Record<string, unknown>) { + return app.request("/chat", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + } + + it("records a per-model sample from a turn and aggregates it (token-weighted tok/s)", async () => { + const store = createThroughputStore({ storage: createMemStorage() }); + const events: AgentEvent[] = [ + { + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: "t1#0" as StepId, + genTotalMs: 2000, + }, + { + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + usage: { inputTokens: 10, outputTokens: 400 }, + }, + ]; + const app = appWith(store, events); + + const chat = await postChat(app, { + conversationId: "c1", + message: "hi", + model: "claude/haiku", + }); + expect(chat.status).toBe(200); + + const res = await app.request(`/metrics/throughput?period=day&date=${day}`); + expect(res.status).toBe(200); + const report = (await res.json()) as ThroughputResponse; + expect(report.period).toBe("day"); + expect(report.models).toHaveLength(1); + expect(report.models[0]).toMatchObject({ + model: "claude/haiku", + totalOutputTokens: 400, + totalGenMs: 2000, + tokensPerSecond: 200, // 400 tokens / 2s + turns: 1, + }); + }); + + it("does not record a sample when no model is selected", async () => { + const store = createThroughputStore({ storage: createMemStorage() }); + const events: AgentEvent[] = [ + { + type: "step-complete", + conversationId: "c1", + turnId: "t1", + stepId: "t1#0" as StepId, + genTotalMs: 2000, + }, + { + type: "done", + conversationId: "c1", + turnId: "t1", + reason: "stop", + usage: { inputTokens: 1, outputTokens: 5 }, + }, + ]; + const app = appWith(store, events); + + await postChat(app, { conversationId: "c1", message: "hi" }); // no model + const res = await app.request(`/metrics/throughput?period=day&date=${day}`); + const report = (await res.json()) as { models: unknown[] }; + expect(report.models).toEqual([]); + }); + + it("returns 400 for an invalid period", async () => { + const app = appWith(createThroughputStore({ storage: createMemStorage() }), []); + const res = await app.request("/metrics/throughput?period=year&date=2026"); + expect(res.status).toBe(400); + }); + + it("returns 400 for a malformed date", async () => { + const app = appWith(createThroughputStore({ storage: createMemStorage() }), []); + const res = await app.request("/metrics/throughput?period=day&date=nope"); + expect(res.status).toBe(400); + }); + + it("returns 400 when date is missing", async () => { + const app = appWith(createThroughputStore({ storage: createMemStorage() }), []); + const res = await app.request("/metrics/throughput?period=day"); + expect(res.status).toBe(400); + }); +}); diff --git a/packages/transport-http/src/app.ts b/packages/transport-http/src/app.ts index 4002e23..3c9ae85 100644 --- a/packages/transport-http/src/app.ts +++ b/packages/transport-http/src/app.ts @@ -3,6 +3,7 @@ import type { ConversationHistoryResponse, ConversationMetricsResponse, ModelsResponse, + ThroughputResponse, } from "@dispatch/transport-contract"; import { Hono } from "hono"; import { cors } from "hono/cors"; @@ -13,14 +14,24 @@ import { parseSinceSeq, serializeEventLine, } from "./logic.js"; -import type { ConversationStore, CredentialStore, SessionOrchestrator } from "./seam.js"; +import { + type ConversationStore, + type CredentialStore, + type SessionOrchestrator, + ThroughputQueryError, + type ThroughputStore, +} from "./seam.js"; export interface CreateServerOptions { readonly conversationStore: ConversationStore; readonly orchestrator: SessionOrchestrator; readonly credentialStore: CredentialStore; + /** Optional — defaults to a no-op store (recording disabled, empty reports). */ + readonly throughputStore?: ThroughputStore; readonly logger?: Logger; readonly generateId?: () => string; + /** Injectable clock for sample timestamps (default Date.now). */ + readonly now?: () => number; } const noopLogger: Logger = { @@ -45,10 +56,44 @@ const noopLogger: Logger = { }, }; +const noopThroughputStore: ThroughputStore = { + record: async () => {}, + aggregate: async (q) => ({ period: q.period, date: q.date, start: 0, end: 0, models: [] }), +}; + export function createApp(opts: CreateServerOptions): Hono { const app = new Hono(); const log = opts.logger ?? noopLogger; const generateId = opts.generateId ?? (() => crypto.randomUUID()); + const now = opts.now ?? (() => Date.now()); + const throughputStore = opts.throughputStore ?? noopThroughputStore; + + async function recordThroughput( + turnEvents: readonly AgentEvent[], + model: string | undefined, + ): Promise<void> { + if (model === undefined) return; // no model selected → nothing to attribute + let genMs = 0; + let outputTokens = 0; + for (const e of turnEvents) { + if (e.type === "step-complete" && e.genTotalMs !== undefined) genMs += e.genTotalMs; + if (e.type === "done" && e.usage !== undefined) outputTokens = e.usage.outputTokens; + } + if (genMs <= 0) return; // no generation time → can't compute tok/s + try { + await throughputStore.record({ model, ts: now(), outputTokens, genMs }); + log.info("throughput: turn recorded", { + model, + outputTokens, + genMs, + tokensPerSecond: Math.round((outputTokens / (genMs / 1000)) * 100) / 100, + }); + } catch (err) { + log.warn("throughput: failed to record sample", { + error: err instanceof Error ? err.message : String(err), + }); + } + } app.use( "*", @@ -174,6 +219,11 @@ export function createApp(opts: CreateServerOptions): Hono { await streamReady; await orchestratorPromise.catch(() => {}); + // Record a per-model throughput sample for this turn. Generation time is + // the PURE decode time — the sum of per-step genTotalMs (excludes tool + // waits) — and tokens are the turn's aggregate output tokens. + await recordThroughput(events, model); + const ndjson = events.map(serializeEventLine).join(""); return c.text(ndjson, 200, { @@ -182,5 +232,28 @@ export function createApp(opts: CreateServerOptions): Hono { }); }); + app.get("/metrics/throughput", async (c) => { + const period = c.req.query("period"); + const date = c.req.query("date"); + if (period !== "day" && period !== "week" && period !== "month") { + return c.json({ error: "query param 'period' must be one of: day, week, month" }, 400); + } + if (date === undefined || date === "") { + return c.json({ error: "query param 'date' is required" }, 400); + } + try { + // Typed against the wire contract: if the store's report shape ever + // drifts from ThroughputResponse, this assignment fails to compile. + const body: ThroughputResponse = await throughputStore.aggregate({ period, date }); + return c.json(body); + } catch (err) { + if (err instanceof ThroughputQueryError) { + return c.json({ error: err.message }, 400); + } + log.error("throughput: aggregate failed", { err }); + return c.json({ error: "Failed to aggregate throughput" }, 502); + } + }); + return app; } diff --git a/packages/transport-http/src/extension.ts b/packages/transport-http/src/extension.ts index adbf87f..4abd7aa 100644 --- a/packages/transport-http/src/extension.ts +++ b/packages/transport-http/src/extension.ts @@ -4,6 +4,7 @@ import { conversationStoreHandle, credentialStoreHandle, sessionOrchestratorHandle, + throughputStoreHandle, } from "./seam.js"; export const manifest: Manifest = { @@ -12,9 +13,11 @@ export const manifest: Manifest = { version: "0.0.0", apiVersion: "^0.1.0", trust: "bundled", - dependsOn: ["conversation-store", "credential-store", "session-orchestrator"], + dependsOn: ["conversation-store", "credential-store", "session-orchestrator", "throughput-store"], capabilities: { network: true }, - contributes: { routes: ["/chat", "/conversations/:id", "/health", "/models"] }, + contributes: { + routes: ["/chat", "/conversations/:id", "/health", "/models", "/metrics/throughput"], + }, activation: "eager", }; @@ -32,12 +35,14 @@ export function createTransportHttpExtension(): Extension & { const conversationStore = host.getService(conversationStoreHandle); const orchestrator = host.getService(sessionOrchestratorHandle); const credentialStore = host.getService(credentialStoreHandle); + const throughputStore = host.getService(throughputStoreHandle); const logger = host.logger; const app = createApp({ conversationStore, orchestrator, credentialStore, + throughputStore, logger, }); diff --git a/packages/transport-http/src/seam.ts b/packages/transport-http/src/seam.ts index c7bfb74..7dbaa1b 100644 --- a/packages/transport-http/src/seam.ts +++ b/packages/transport-http/src/seam.ts @@ -4,3 +4,5 @@ export type { CredentialStore } from "@dispatch/credential-store"; export { credentialStoreHandle } from "@dispatch/credential-store"; export type { SessionOrchestrator } from "@dispatch/session-orchestrator"; export { sessionOrchestratorHandle } from "@dispatch/session-orchestrator"; +export type { ThroughputStore } from "@dispatch/throughput-store"; +export { ThroughputQueryError, throughputStoreHandle } from "@dispatch/throughput-store"; diff --git a/packages/transport-http/tsconfig.json b/packages/transport-http/tsconfig.json index bcbf86a..fc29c8c 100644 --- a/packages/transport-http/tsconfig.json +++ b/packages/transport-http/tsconfig.json @@ -7,6 +7,7 @@ { "path": "../credential-store" }, { "path": "../kernel" }, { "path": "../session-orchestrator" }, + { "path": "../throughput-store" }, { "path": "../transport-contract" } ] } diff --git a/tsconfig.json b/tsconfig.json index 58ec820..68d373c 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -13,6 +13,7 @@ { "path": "./packages/provider-openai-compat" }, { "path": "./packages/credential-store" }, { "path": "./packages/conversation-store" }, + { "path": "./packages/throughput-store" }, { "path": "./packages/session-orchestrator" }, { "path": "./packages/transport-http" }, { "path": "./packages/tool-read-file" }, |
