1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
import type { Logger, StorageNamespace } from "@dispatch/kernel";
import { aggregateSamples, type ModelThroughput, type ThroughputSample } from "./aggregate.js";
import { dayKeyOf, type Period, resolvePeriod } from "./period.js";
export interface ThroughputReport {
readonly period: Period;
readonly date: string;
/** Inclusive start, epoch-ms. */
readonly start: number;
/** Exclusive end, epoch-ms. */
readonly end: number;
readonly models: readonly ModelThroughput[];
}
export interface ThroughputQuery {
readonly period: Period;
readonly date: string;
}
/**
* Cross-conversation, per-model throughput store. Records one sample per
* completed turn and aggregates token-weighted tok/s over a day/week/month.
* Persistence is the injected KV namespace; samples are bucketed by local day
* key so a period query addresses only its own day buckets.
*/
export interface ThroughputStore {
readonly record: (sample: ThroughputSample) => Promise<void>;
readonly aggregate: (query: ThroughputQuery) => Promise<ThroughputReport>;
}
export interface ThroughputStoreDeps {
readonly storage: StorageNamespace;
readonly logger?: Logger;
/** Injectable unique-id generator (default crypto.randomUUID). */
readonly newId?: () => string;
}
/** Thrown when a query's `(period, date)` is malformed. */
export class ThroughputQueryError extends Error {}
const SAMPLE_PREFIX = "sample";
export function createThroughputStore(deps: ThroughputStoreDeps): ThroughputStore {
const newId = deps.newId ?? (() => crypto.randomUUID());
return {
async record(sample) {
// Per-sample key under the local-day bucket → write is a single set
// (no read-modify-write, so concurrent turns can't lose a sample).
const day = dayKeyOf(sample.ts);
const key = `${SAMPLE_PREFIX}:${day}:${sample.ts}:${newId()}`;
await deps.storage.set(key, JSON.stringify(sample));
},
async aggregate(query) {
const resolved = resolvePeriod(query.period, query.date);
if (!resolved.ok) throw new ThroughputQueryError(resolved.error);
const samples: ThroughputSample[] = [];
for (const day of resolved.dayKeys) {
const keys = await deps.storage.keys(`${SAMPLE_PREFIX}:${day}:`);
for (const k of keys) {
const raw = await deps.storage.get(k);
if (raw === null) continue;
try {
samples.push(JSON.parse(raw) as ThroughputSample);
} catch {
// Skip a malformed row rather than failing the whole query.
}
}
}
const models = aggregateSamples(samples, resolved.start, resolved.end);
return {
period: query.period,
date: resolved.date,
start: resolved.start,
end: resolved.end,
models,
};
},
};
}
|