summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--packages/api/src/agent-manager.ts27
-rw-r--r--packages/api/src/routes/tabs.ts6
-rw-r--r--packages/api/tests/agent-manager.test.ts190
-rw-r--r--packages/api/tests/routes.test.ts82
-rw-r--r--packages/core/src/chunks/transform.ts5
-rw-r--r--packages/core/src/db/chunks.ts90
-rw-r--r--packages/core/src/index.ts1
-rw-r--r--packages/core/src/types/index.ts28
-rw-r--r--packages/core/tests/db/chunks.db.test.ts336
-rw-r--r--packages/frontend/src/lib/tabs.svelte.ts7
-rw-r--r--packages/frontend/tests/chat-store.test.ts186
11 files changed, 944 insertions, 14 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index 5a0ffdf..1db9a04 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -54,6 +54,7 @@ import {
TaskList,
toAvailableSubagents,
toAvailableUserAgents,
+ type UsageData,
validateConfig,
} from "@dispatch/core";
import type { PermissionManager } from "./permission-manager.js";
@@ -1477,6 +1478,10 @@ export class AgentManager {
// turn (text / thinking / tool-batch / error / system), folded from
// the stream via the shared `appendEventToChunks` helper.
const chunks: Chunk[] = [];
+ // Per-attempt usage accumulator. Reset each fallback attempt so a
+ // superseded (rate-limited) attempt's usage is discarded alongside its
+ // `chunks`. One `usage` event → one UsageData row.
+ const usageRows: UsageData[] = [];
const assistantId = crypto.randomUUID();
let assistantPersisted = false;
tabAgent.currentChunks = chunks;
@@ -1487,8 +1492,17 @@ export class AgentManager {
// `tool-batch` into separate `tool_call` + `tool_result` rows and
// tags every row with `turn_id` + derived `step`.
const flushAssistant = (): void => {
- if (assistantPersisted || chunks.length === 0) return;
- appendChunks(tabId, explodeTurn(turnId, chunks));
+ if (assistantPersisted) return;
+ // Append usage as extra drafts in the SAME appendChunks call as the
+ // turn's content rows: one atomic write, one fsync, contiguous seqs.
+ // Usage rows are an invisible side channel (excluded from
+ // getChunksForTab); `step` is cosmetic for usage (never grouped).
+ const drafts = explodeTurn(turnId, chunks);
+ for (const u of usageRows) {
+ drafts.push({ turnId, step: 0, role: "assistant", type: "usage", data: u });
+ }
+ if (drafts.length === 0) return;
+ appendChunks(tabId, drafts);
assistantPersisted = true;
};
@@ -1542,6 +1556,15 @@ export class AgentManager {
allOutput += event.delta;
}
+ // Capture per-step usage as a side-channel row to persist with the
+ // turn (one row per `usage` event). The live `this.emit(event)`
+ // above still drives in-session accumulation; this is the reload-
+ // persistence path. `appendEventToChunks` intentionally ignores
+ // `usage`, so it never becomes message content.
+ if (event.type === "usage") {
+ usageRows.push({ ...event.usage });
+ }
+
// Route every content-bearing event through the shared helper.
// `appendEventToChunks` ignores lifecycle events (status / done
// / task-list-update / tab-created / message-* / etc), so it's
diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts
index b1e9659..f52ee99 100644
--- a/packages/api/src/routes/tabs.ts
+++ b/packages/api/src/routes/tabs.ts
@@ -6,6 +6,7 @@ import {
getSetting,
getTab,
getTotalChunkCount,
+ getUsageStatsForTab,
groupRowsToMessages,
listOpenTabs,
setSetting,
@@ -27,7 +28,10 @@ export function setTabsAgentManager(
}
tabsRoutes.get("/", (c) => {
- const tabs = listOpenTabs();
+ // Enrich each tab with its persisted usage aggregate so the frontend can
+ // seed `cacheStats` on reload without an extra round-trip. N small indexed
+ // queries — fine for tab counts.
+ const tabs = listOpenTabs().map((t) => ({ ...t, usageStats: getUsageStatsForTab(t.id) }));
return c.json({ tabs });
});
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index b9b4510..95fb558 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -91,6 +91,19 @@ function setFakeSetting(key: string, value: string): void {
fakeSettings.set(key, value);
}
+// Capture every appendChunks(tabId, drafts) call so tests can assert what got
+// persisted (e.g. usage side-channel rows). The real explodeTurn is mocked to
+// return [], so content drafts are empty here; usage rows are pushed directly
+// by processMessage's flushAssistant, making them the visible drafts.
+interface AppendChunksCall {
+ tabId: string;
+ drafts: Array<{ turnId: string; step: number; role: string; type: string; data: unknown }>;
+}
+const appendChunksCalls: AppendChunksCall[] = [];
+function resetAppendChunksCalls(): void {
+ appendChunksCalls.length = 0;
+}
+
// Allow tests to swap in a custom `run` generator (e.g. to simulate
// a fallback failure mid-stream). Returning to undefined restores
// the default.
@@ -345,7 +358,8 @@ vi.mock("@dispatch/core", () => ({
getSetting(key: string) {
return fakeSettings.get(key) ?? null;
},
- appendChunks() {
+ appendChunks(tabId: string, drafts: AppendChunksCall["drafts"]) {
+ appendChunksCalls.push({ tabId, drafts: [...drafts] });
return [];
},
explodeUserText() {
@@ -406,6 +420,7 @@ describe("AgentManager", () => {
resetFakeSettings();
setRunImpl(null);
appendEventToChunksSpy.mockClear();
+ resetAppendChunksCalls();
});
it("initial status is idle", () => {
@@ -1331,4 +1346,177 @@ describe("AgentManager", () => {
expect(tools).not.toContain("read_tab");
});
});
+
+ // ─── Usage side-channel persistence ──────────────────────────────
+ //
+ // `usage` AgentEvents (one per LLM round-trip) are persisted as invisible
+ // `type:"usage"` chunk rows so per-tab token/cache telemetry survives a
+ // reload. They ride the SAME atomic appendChunks call as the turn's content
+ // rows (one fsync, contiguous seqs). A superseded fallback attempt's usage is
+ // discarded with its `chunks` (per-attempt accumulator).
+ describe("usage persistence", () => {
+ it("writes one usage row per usage event emitted during a turn", async () => {
+ const manager = new AgentManager();
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 },
+ } as const;
+ yield { type: "text-delta", delta: "step two" } as const;
+ yield {
+ type: "usage",
+ usage: {
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "step two" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ await manager.processMessage("tab-usage-rows", "go");
+
+ const usageDrafts = appendChunksCalls
+ .flatMap((c) => c.drafts)
+ .filter((d) => d.type === "usage");
+ expect(usageDrafts).toHaveLength(2);
+ // One row per event, role=assistant, step cosmetic (0).
+ expect(usageDrafts.every((d) => d.role === "assistant" && d.step === 0)).toBe(true);
+ expect(usageDrafts[0]?.data).toEqual({
+ inputTokens: 1000,
+ outputTokens: 40,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 900,
+ });
+ expect(usageDrafts[1]?.data).toEqual({
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ });
+ });
+
+ it("emits usage rows in the SAME appendChunks call as the turn's content (one atomic write)", async () => {
+ const manager = new AgentManager();
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ yield { type: "text-delta", delta: "hi" } as const;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 5, outputTokens: 1, cacheReadTokens: 2, cacheWriteTokens: 3 },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "hi" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ await manager.processMessage("tab-usage-atomic", "go");
+
+ // Exactly one appendChunks call carries the usage draft (the flush). The
+ // user-message append and any system-row appends carry no usage rows.
+ const callsWithUsage = appendChunksCalls.filter((c) =>
+ c.drafts.some((d) => d.type === "usage"),
+ );
+ expect(callsWithUsage).toHaveLength(1);
+ expect(callsWithUsage[0]?.tabId).toBe("tab-usage-atomic");
+ });
+
+ it("discards a superseded (rate-limited) attempt's usage on fallback", async () => {
+ const manager = new AgentManager();
+ // Inject a minimal model registry so the rate-limit fallback path is
+ // taken (real `processMessage` requires modelRegistry + a resolved
+ // keyId + a next fallback entry to retry).
+ const markKeyExhausted = vi.fn();
+ (
+ manager as unknown as {
+ modelRegistry: {
+ getKeys(): Array<{ definition: Record<string, unknown> }>;
+ markKeyExhausted(): void;
+ };
+ }
+ ).modelRegistry = {
+ getKeys: () => [
+ {
+ definition: {
+ id: "k1",
+ provider: "openai-compatible",
+ env: "ENV1",
+ base_url: "http://x",
+ },
+ },
+ {
+ definition: {
+ id: "k2",
+ provider: "openai-compatible",
+ env: "ENV2",
+ base_url: "http://y",
+ },
+ },
+ ],
+ markKeyExhausted,
+ };
+
+ let attempt = 0;
+ setRunImpl(async function* () {
+ attempt++;
+ yield { type: "status", status: "running" } as const;
+ if (attempt === 1) {
+ // Attempt 1 emits usage then rate-limits — its usage must be dropped.
+ yield {
+ type: "usage",
+ usage: { inputTokens: 999, outputTokens: 9, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ } as const;
+ yield { type: "error", error: "rate limit exceeded (status=429)" } as const;
+ return;
+ }
+ // Attempt 2 succeeds — only its usage should persist.
+ yield {
+ type: "usage",
+ usage: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "recovered" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ const agentModels = [
+ { key_id: "k1", model_id: "m1" },
+ { key_id: "k2", model_id: "m2" },
+ ];
+ await manager.processMessage(
+ "tab-usage-fallback",
+ "go",
+ undefined,
+ undefined,
+ undefined,
+ undefined,
+ agentModels,
+ );
+
+ expect(attempt).toBe(2); // confirm the fallback retry actually happened
+ expect(markKeyExhausted).toHaveBeenCalled();
+
+ const usageDrafts = appendChunksCalls
+ .flatMap((c) => c.drafts)
+ .filter((d) => d.type === "usage");
+ // Only attempt 2's usage survives.
+ expect(usageDrafts).toHaveLength(1);
+ expect(usageDrafts[0]?.data).toEqual({
+ inputTokens: 222,
+ outputTokens: 22,
+ cacheReadTokens: 100,
+ cacheWriteTokens: 5,
+ });
+ });
+ });
});
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index c768cee..ad6d5b1 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -1,6 +1,24 @@
import type { ToolDefinition } from "@dispatch/core";
import { describe, expect, it, vi } from "vitest";
+// Seedable backing stores for the tabs route (GET /tabs enrichment). Declared
+// before vi.mock so the hoisted factory closure can reference them; populated
+// per-test.
+interface FakeOpenTab {
+ id: string;
+ title: string;
+ keyId: string | null;
+ modelId: string | null;
+ parentTabId: string | null;
+ status: string;
+ isOpen: boolean;
+ position: number;
+ createdAt: number;
+ updatedAt: number;
+}
+const fakeOpenTabs: FakeOpenTab[] = [];
+const fakeUsageStats = new Map<string, unknown>();
+
// Mock @dispatch/core's Agent to avoid real LLM calls
vi.mock("@dispatch/core", () => ({
Agent: class MockAgent {
@@ -170,7 +188,7 @@ vi.mock("@dispatch/core", () => ({
return null;
},
listOpenTabs() {
- return [];
+ return [...fakeOpenTabs];
},
resolveTabPrefix() {
return { status: "none" };
@@ -230,6 +248,9 @@ vi.mock("@dispatch/core", () => ({
getTotalChunkCount() {
return 0;
},
+ getUsageStatsForTab(tabId: string) {
+ return fakeUsageStats.get(tabId) ?? null;
+ },
appendEventToChunks(_chunks: unknown[], _event: unknown) {
// no-op stub
},
@@ -413,6 +434,65 @@ describe("POST /chat", () => {
});
});
+describe("GET /tabs", () => {
+ it("enriches each open tab with its persisted usageStats aggregate", async () => {
+ fakeOpenTabs.length = 0;
+ fakeUsageStats.clear();
+ fakeOpenTabs.push({
+ id: "tab-u",
+ title: "Has usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ status: "idle",
+ isOpen: true,
+ position: 0,
+ createdAt: 0,
+ updatedAt: 0,
+ });
+ fakeOpenTabs.push({
+ id: "tab-none",
+ title: "No usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ status: "idle",
+ isOpen: true,
+ position: 1,
+ createdAt: 0,
+ updatedAt: 0,
+ });
+ fakeUsageStats.set("tab-u", {
+ inputTokens: 2200,
+ outputTokens: 100,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 1000,
+ requests: 2,
+ last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 },
+ });
+
+ const res = await app.request("/tabs");
+ expect(res.status).toBe(200);
+ const body = await res.json();
+ expect(Array.isArray(body.tabs)).toBe(true);
+ const tabU = body.tabs.find((t: { id: string }) => t.id === "tab-u");
+ const tabNone = body.tabs.find((t: { id: string }) => t.id === "tab-none");
+ expect(tabU.usageStats).toEqual({
+ inputTokens: 2200,
+ outputTokens: 100,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 1000,
+ requests: 2,
+ last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 },
+ });
+ // A tab with no usage rows surfaces null (not undefined/missing).
+ expect(tabNone.usageStats).toBeNull();
+
+ fakeOpenTabs.length = 0;
+ fakeUsageStats.clear();
+ });
+});
+
describe("GET /tabs/:id/chunks", () => {
it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => {
const res = await app.request("/tabs/tab-x/chunks?limit=50");
diff --git a/packages/core/src/chunks/transform.ts b/packages/core/src/chunks/transform.ts
index a4c6fc8..e8f4a18 100644
--- a/packages/core/src/chunks/transform.ts
+++ b/packages/core/src/chunks/transform.ts
@@ -209,6 +209,11 @@ export function groupRowsToMessages(rows: ChunkRow[]): MessageRow[] {
continue;
}
+ // Usage rows are an invisible side channel (persisted for the backend
+ // aggregate only). They're already query-excluded from getChunksForTab,
+ // so this is defensive insurance: never let one leak into render grouping.
+ if (row.type === "usage") continue;
+
// assistant / tool rows → part of the current assistant message
const c = ensureAssistant(row);
switch (row.type) {
diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts
index 077259d..509cbbd 100644
--- a/packages/core/src/db/chunks.ts
+++ b/packages/core/src/db/chunks.ts
@@ -5,7 +5,7 @@ import {
groupRowsToMessages,
type MessageRow,
} from "../chunks/transform.js";
-import type { ChunkData, ChunkRow, ChunkRowDraft, TextData } from "../types/index.js";
+import type { ChunkData, ChunkRow, ChunkRowDraft, TextData, UsageData } from "../types/index.js";
import { getDatabase } from "./index.js";
// Re-export the DB-free transforms so existing barrel consumers
@@ -101,7 +101,7 @@ export function getChunksForTab(
const db = getDatabase();
if (!options) {
const rows = db
- .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC")
+ .query("SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC")
.all({ $tabId: tabId }) as Array<Record<string, unknown>>;
return rows.map(mapRow);
}
@@ -110,24 +110,28 @@ export function getChunksForTab(
if (limit !== undefined) {
const rows = db
.query(
- "SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC LIMIT $limit",
+ "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC LIMIT $limit",
)
.all({ $tabId: tabId, $before: before, $limit: limit }) as Array<Record<string, unknown>>;
return rows.map(mapRow).reverse();
}
const rows = db
- .query("SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC")
+ .query(
+ "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC",
+ )
.all({ $tabId: tabId, $before: before }) as Array<Record<string, unknown>>;
return rows.map(mapRow).reverse();
}
if (limit !== undefined) {
const rows = db
- .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq DESC LIMIT $limit")
+ .query(
+ "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq DESC LIMIT $limit",
+ )
.all({ $tabId: tabId, $limit: limit }) as Array<Record<string, unknown>>;
return rows.map(mapRow).reverse();
}
const rows = db
- .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC")
+ .query("SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC")
.all({ $tabId: tabId }) as Array<Record<string, unknown>>;
return rows.map(mapRow);
}
@@ -145,11 +149,83 @@ export function getMessagesForTab(tabId: string): MessageRow[] {
export function getTotalChunkCount(tabId: string): number {
const db = getDatabase();
const row = db
- .query("SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId")
+ .query("SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId AND type != 'usage'")
.get({ $tabId: tabId }) as { count: number } | null;
return row?.count ?? 0;
}
+/**
+ * Aggregate per-tab token/cache usage across ALL persisted `usage` chunk rows.
+ *
+ * Usage rows are written as an invisible side channel (one row per `usage`
+ * AgentEvent) and are query-excluded from `getChunksForTab`/`getTotalChunkCount`,
+ * so this aggregate is the read path. Because it sums server-side over every
+ * row, it stays complete even after the frontend evicts/pages out old turns
+ * (eviction is in-memory only). The return shape is structurally identical to
+ * the frontend `CacheStats`, so reload can seed it directly.
+ *
+ * - cumulative `inputTokens`/`outputTokens`/`cacheReadTokens`/`cacheWriteTokens`
+ * = SUM over all usage rows;
+ * - `requests` = COUNT of usage rows;
+ * - `last` = the highest-seq usage row's split (most recent request);
+ * - `null` when the tab has no usage rows.
+ *
+ * Sums in JS after selecting the rows (mirroring `mapRow`) to avoid relying on
+ * `json_extract` over the freeform `data_json`.
+ */
+export function getUsageStatsForTab(tabId: string): {
+ inputTokens: number;
+ outputTokens: number;
+ cacheReadTokens: number;
+ cacheWriteTokens: number;
+ requests: number;
+ last: {
+ inputTokens: number;
+ outputTokens: number;
+ cacheReadTokens: number;
+ cacheWriteTokens: number;
+ } | null;
+} | null {
+ const db = getDatabase();
+ const rows = db
+ .query("SELECT data_json FROM chunks WHERE tab_id = $tabId AND type = 'usage' ORDER BY seq ASC")
+ .all({ $tabId: tabId }) as Array<{ data_json: string }>;
+ if (rows.length === 0) return null;
+
+ let inputTokens = 0;
+ let outputTokens = 0;
+ let cacheReadTokens = 0;
+ let cacheWriteTokens = 0;
+ let last: UsageData | null = null;
+ for (const row of rows) {
+ let u: UsageData;
+ try {
+ u = JSON.parse(row.data_json) as UsageData;
+ } catch {
+ continue;
+ }
+ inputTokens += u.inputTokens ?? 0;
+ outputTokens += u.outputTokens ?? 0;
+ cacheReadTokens += u.cacheReadTokens ?? 0;
+ cacheWriteTokens += u.cacheWriteTokens ?? 0;
+ last = {
+ inputTokens: u.inputTokens ?? 0,
+ outputTokens: u.outputTokens ?? 0,
+ cacheReadTokens: u.cacheReadTokens ?? 0,
+ cacheWriteTokens: u.cacheWriteTokens ?? 0,
+ };
+ }
+
+ return {
+ inputTokens,
+ outputTokens,
+ cacheReadTokens,
+ cacheWriteTokens,
+ requests: rows.length,
+ last,
+ };
+}
+
export function clearChunksForTab(tabId: string): void {
const db = getDatabase();
db.query("DELETE FROM chunks WHERE tab_id = $tabId").run({ $tabId: tabId });
diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts
index 327b0a5..47f9218 100644
--- a/packages/core/src/index.ts
+++ b/packages/core/src/index.ts
@@ -37,6 +37,7 @@ export {
getChunksForTab,
getMessagesForTab,
getTotalChunkCount,
+ getUsageStatsForTab,
groupRowsToMessages,
type MessageRow,
} from "./db/chunks.js";
diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts
index ced3dc2..abade27 100644
--- a/packages/core/src/types/index.ts
+++ b/packages/core/src/types/index.ts
@@ -90,7 +90,14 @@ export interface ChatMessage {
export type ChunkRole = "user" | "assistant" | "tool" | "system";
/** Discriminator for a persisted chunk row's payload. */
-export type ChunkType = "text" | "thinking" | "tool_call" | "tool_result" | "error" | "system";
+export type ChunkType =
+ | "text"
+ | "thinking"
+ | "tool_call"
+ | "tool_result"
+ | "error"
+ | "system"
+ | "usage";
export interface TextData {
text: string;
@@ -119,6 +126,22 @@ export interface SystemData {
kind: SystemChunkKind;
text: string;
}
+/**
+ * Per-request token usage persisted as a SIDE-CHANNEL chunk row (one row per
+ * `usage` AgentEvent, i.e. one per LLM round-trip). These rows are deliberately
+ * EXCLUDED from `getChunksForTab`/`getTotalChunkCount` so they never enter the
+ * render, pagination, eviction, or agent-history-rebuild paths — they exist
+ * only to feed the backend aggregate `getUsageStatsForTab`, which seeds the
+ * frontend's `cacheStats` on reload. `inputTokens` is the TOTAL prompt
+ * (cached + fresh); `cacheReadTokens`/`cacheWriteTokens` are Anthropic's
+ * prompt-cache split. Mirrors the `usage` AgentEvent payload.
+ */
+export interface UsageData {
+ inputTokens: number;
+ outputTokens: number;
+ cacheReadTokens: number;
+ cacheWriteTokens: number;
+}
export type ChunkData =
| TextData
@@ -126,7 +149,8 @@ export type ChunkData =
| ToolCallData
| ToolResultData
| ErrorData
- | SystemData;
+ | SystemData
+ | UsageData;
/**
* A persisted chunk row — the append-only unit of conversation storage and
diff --git a/packages/core/tests/db/chunks.db.test.ts b/packages/core/tests/db/chunks.db.test.ts
new file mode 100644
index 0000000..4f7d517
--- /dev/null
+++ b/packages/core/tests/db/chunks.db.test.ts
@@ -0,0 +1,336 @@
+import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
+import type { ChunkRowDraft, UsageData } from "../../src/types/index.js";
+
+/**
+ * Internal row shape — matches the production `chunks` table columns.
+ * Kept loose at the `query()` boundary to mirror bun:sqlite's dynamic
+ * return type.
+ */
+interface ChunkRecord {
+ id: string;
+ tab_id: string;
+ seq: number;
+ turn_id: string;
+ step: number;
+ role: string;
+ type: string;
+ data_json: string;
+ created_at: number;
+}
+
+/**
+ * In-memory fake of `bun:sqlite`'s Database implementing only the queries
+ * `chunks.ts` actually issues. Same approach as `tabs.test.ts`: match exact
+ * normalized query strings as fixed branches (no SQL parser), so a query-string
+ * change fails loudly as "unsupported" instead of silently returning wrong data.
+ *
+ * This lets the DB-backed `getChunksForTab` / `getTotalChunkCount` /
+ * `getUsageStatsForTab` logic run under vitest, where `bun:sqlite` can't load.
+ */
+class FakeDatabase {
+ rows: ChunkRecord[] = [];
+ private idCounter = 0;
+
+ query(sql: string): {
+ all: (params?: Record<string, unknown>) => unknown[];
+ get: (params?: Record<string, unknown>) => unknown;
+ run: (params?: Record<string, unknown>) => void;
+ } {
+ return {
+ all: (params) => this.execSelect(sql, params),
+ get: (params) => this.execSelect(sql, params)[0] ?? null,
+ run: (params) => {
+ this.execMutation(sql, params);
+ },
+ };
+ }
+
+ /** bun:sqlite's `db.transaction(fn)` returns a callable that runs `fn`. */
+ transaction(fn: () => void): () => void {
+ return () => {
+ fn();
+ };
+ }
+
+ private execSelect(sql: string, params?: Record<string, unknown>): unknown[] {
+ const norm = sql.replace(/\s+/g, " ").trim();
+ const tabId = params?.$tabId as string | undefined;
+ const forTab = this.rows.filter((r) => r.tab_id === tabId);
+ const visible = forTab.filter((r) => r.type !== "usage");
+
+ // appendChunks: next-seq lookup (counts ALL rows, incl. usage)
+ if (norm === "SELECT COALESCE(MAX(seq), -1) as max_seq FROM chunks WHERE tab_id = $tabId") {
+ const seqs = forTab.map((r) => r.seq);
+ return [{ max_seq: seqs.length > 0 ? Math.max(...seqs) : -1 }];
+ }
+
+ // getChunksForTab — no options (usage excluded)
+ if (
+ norm === "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC"
+ ) {
+ return [...visible].sort((a, b) => a.seq - b.seq);
+ }
+
+ // getChunksForTab — before + limit (usage excluded)
+ if (
+ norm ===
+ "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC LIMIT $limit"
+ ) {
+ const before = params?.$before as number;
+ const limit = params?.$limit as number;
+ return visible
+ .filter((r) => r.seq < before)
+ .sort((a, b) => b.seq - a.seq)
+ .slice(0, limit);
+ }
+
+ // getChunksForTab — before only (usage excluded)
+ if (
+ norm ===
+ "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC"
+ ) {
+ const before = params?.$before as number;
+ return visible.filter((r) => r.seq < before).sort((a, b) => b.seq - a.seq);
+ }
+
+ // getChunksForTab — limit only (usage excluded)
+ if (
+ norm ===
+ "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq DESC LIMIT $limit"
+ ) {
+ const limit = params?.$limit as number;
+ return [...visible].sort((a, b) => b.seq - a.seq).slice(0, limit);
+ }
+
+ // getTotalChunkCount (usage excluded)
+ if (norm === "SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId AND type != 'usage'") {
+ return [{ count: visible.length }];
+ }
+
+ // getUsageStatsForTab: usage rows only, in seq order
+ if (
+ norm ===
+ "SELECT data_json FROM chunks WHERE tab_id = $tabId AND type = 'usage' ORDER BY seq ASC"
+ ) {
+ return forTab
+ .filter((r) => r.type === "usage")
+ .sort((a, b) => a.seq - b.seq)
+ .map((r) => ({ data_json: r.data_json }));
+ }
+
+ throw new Error(`FakeDatabase: unsupported SELECT: ${norm}`);
+ }
+
+ private execMutation(sql: string, params?: Record<string, unknown>): void {
+ const norm = sql.replace(/\s+/g, " ").trim();
+
+ // appendChunks: single-row insert
+ if (
+ norm ===
+ "INSERT INTO chunks (id, tab_id, seq, turn_id, step, role, type, data_json, created_at) VALUES ($id, $tabId, $seq, $turnId, $step, $role, $type, $dataJson, $now)"
+ ) {
+ this.rows.push({
+ id: (params?.$id as string) ?? `c${this.idCounter++}`,
+ tab_id: params?.$tabId as string,
+ seq: params?.$seq as number,
+ turn_id: params?.$turnId as string,
+ step: (params?.$step as number) ?? 0,
+ role: params?.$role as string,
+ type: params?.$type as string,
+ data_json: params?.$dataJson as string,
+ created_at: (params?.$now as number) ?? 0,
+ });
+ return;
+ }
+
+ throw new Error(`FakeDatabase: unsupported mutation: ${norm}`);
+ }
+}
+
+let fakeDb: FakeDatabase;
+
+vi.mock("../../src/db/index.js", () => ({
+ getDatabase: vi.fn(() => fakeDb),
+}));
+
+const { appendChunks, getChunksForTab, getTotalChunkCount, getUsageStatsForTab } = await import(
+ "../../src/db/chunks.js"
+);
+
+function usageDraft(turnId: string, u: UsageData): ChunkRowDraft {
+ return { turnId, step: 0, role: "assistant", type: "usage", data: u };
+}
+
+beforeAll(() => {
+ fakeDb = new FakeDatabase();
+});
+
+beforeEach(() => {
+ fakeDb.rows = [];
+});
+
+// ---------------------------------------------------------------------------
+// usage chunk persistence + side-channel invariants
+// ---------------------------------------------------------------------------
+describe("usage chunk rows (DB-backed)", () => {
+ const TAB = "tab-usage";
+
+ it("persists usage rows alongside content rows with contiguous seqs", () => {
+ appendChunks(TAB, [
+ { turnId: "t1", step: 0, role: "user", type: "text", data: { text: "hi" } },
+ { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "yo" } },
+ usageDraft("t1", {
+ inputTokens: 100,
+ outputTokens: 10,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 90,
+ }),
+ ]);
+ // All three rows landed with contiguous seqs.
+ expect(fakeDb.rows.map((r) => r.seq)).toEqual([0, 1, 2]);
+ expect(fakeDb.rows.map((r) => r.type)).toEqual(["text", "text", "usage"]);
+ });
+
+ it("excludes usage rows from getChunksForTab (all variants)", () => {
+ appendChunks(TAB, [
+ { turnId: "t1", step: 0, role: "user", type: "text", data: { text: "q" } },
+ usageDraft("t1", {
+ inputTokens: 100,
+ outputTokens: 10,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 90,
+ }),
+ { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "a" } },
+ usageDraft("t1", {
+ inputTokens: 200,
+ outputTokens: 20,
+ cacheReadTokens: 150,
+ cacheWriteTokens: 0,
+ }),
+ ]);
+
+ // no options
+ const all = getChunksForTab(TAB);
+ expect(all.every((r) => r.type !== "usage")).toBe(true);
+ expect(all.map((r) => r.type)).toEqual(["text", "text"]);
+
+ // limit only
+ const limited = getChunksForTab(TAB, { limit: 10 });
+ expect(limited.every((r) => r.type !== "usage")).toBe(true);
+ expect(limited).toHaveLength(2);
+
+ // before only — `before` is a seq cursor; usage seqs must never surface
+ const before = getChunksForTab(TAB, { before: 100 });
+ expect(before.every((r) => r.type !== "usage")).toBe(true);
+ expect(before).toHaveLength(2);
+
+ // before + limit
+ const bl = getChunksForTab(TAB, { before: 100, limit: 10 });
+ expect(bl.every((r) => r.type !== "usage")).toBe(true);
+ expect(bl).toHaveLength(2);
+ });
+
+ it("excludes usage rows from getTotalChunkCount", () => {
+ appendChunks(TAB, [
+ { turnId: "t1", step: 0, role: "user", type: "text", data: { text: "q" } },
+ { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "a" } },
+ usageDraft("t1", {
+ inputTokens: 100,
+ outputTokens: 10,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 90,
+ }),
+ ]);
+ // 3 rows total, but only 2 visible.
+ expect(getTotalChunkCount(TAB)).toBe(2);
+ });
+});
+
+// ---------------------------------------------------------------------------
+// getUsageStatsForTab — backend aggregate
+// ---------------------------------------------------------------------------
+describe("getUsageStatsForTab", () => {
+ const TAB = "tab-agg";
+
+ it("returns null when the tab has no usage rows", () => {
+ appendChunks(TAB, [
+ { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "a" } },
+ ]);
+ expect(getUsageStatsForTab(TAB)).toBeNull();
+ });
+
+ it("sums cumulative tokens, counts requests, and reports the last request's split", () => {
+ appendChunks(TAB, [
+ usageDraft("t1", {
+ inputTokens: 1000,
+ outputTokens: 40,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 900,
+ }),
+ usageDraft("t1", {
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ }),
+ ]);
+
+ const stats = getUsageStatsForTab(TAB);
+ expect(stats).not.toBeNull();
+ expect(stats?.requests).toBe(2);
+ expect(stats?.inputTokens).toBe(2200);
+ expect(stats?.outputTokens).toBe(100);
+ expect(stats?.cacheReadTokens).toBe(1000);
+ expect(stats?.cacheWriteTokens).toBe(1000);
+ // `last` = the most recent (highest-seq) usage row.
+ expect(stats?.last).toEqual({
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ });
+ });
+
+ it("is structurally identical to the frontend CacheStats shape (seeds directly)", () => {
+ appendChunks(TAB, [
+ usageDraft("t1", {
+ inputTokens: 5,
+ outputTokens: 1,
+ cacheReadTokens: 2,
+ cacheWriteTokens: 3,
+ }),
+ ]);
+ const stats = getUsageStatsForTab(TAB);
+ expect(Object.keys(stats ?? {}).sort()).toEqual(
+ [
+ "cacheReadTokens",
+ "cacheWriteTokens",
+ "inputTokens",
+ "last",
+ "outputTokens",
+ "requests",
+ ].sort(),
+ );
+ });
+
+ it("is scoped per tab", () => {
+ appendChunks("tab-a", [
+ usageDraft("t1", {
+ inputTokens: 10,
+ outputTokens: 1,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 0,
+ }),
+ ]);
+ appendChunks("tab-b", [
+ usageDraft("t2", {
+ inputTokens: 20,
+ outputTokens: 2,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 0,
+ }),
+ ]);
+ expect(getUsageStatsForTab("tab-a")?.inputTokens).toBe(10);
+ expect(getUsageStatsForTab("tab-b")?.inputTokens).toBe(20);
+ });
+});
diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts
index 3fd7e5f..54f17d9 100644
--- a/packages/frontend/src/lib/tabs.svelte.ts
+++ b/packages/frontend/src/lib/tabs.svelte.ts
@@ -755,6 +755,12 @@ export function createTabStore() {
keyId?: string | null;
modelId?: string | null;
parentTabId?: string | null;
+ // Backend usage aggregate (GET /tabs). Structurally identical to
+ // CacheStats, so it seeds `cacheStats` directly on reload. Seeding
+ // happens ONLY here (hydrate runs when tabs.length === 0, i.e. a true
+ // reload) — never on `statuses` reconnect or `turn-sealed` — so the
+ // persisted aggregate and in-session live `usage` events never overlap.
+ usageStats?: CacheStats | null;
}> = [];
try {
const res = await fetch(`${config.apiBase}/tabs`);
@@ -843,6 +849,7 @@ export function createTabStore() {
chunkLimit: appSettings.chunkLimit,
oldestLoadedSeq: win.oldestSeq,
totalChunks: win.total,
+ cacheStats: row.usageStats ?? undefined,
};
});
diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts
index 33a9f69..4e9691f 100644
--- a/packages/frontend/tests/chat-store.test.ts
+++ b/packages/frontend/tests/chat-store.test.ts
@@ -1005,6 +1005,192 @@ describe("hydrateFromBackend", () => {
expect(tC?.renderGroups.length).toBe(0);
expect(tC?.agentStatus).toBe("idle");
});
+
+ // ─── usage persistence: seed cacheStats from the backend aggregate ──
+ it("seeds cacheStats from a tab's usageStats on hydrate (reload persistence)", async () => {
+ const usageStats = {
+ inputTokens: 2200,
+ outputTokens: 100,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 1000,
+ requests: 2,
+ last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 },
+ };
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.endsWith("/tabs")) {
+ return Promise.resolve({
+ ok: true,
+ json: () =>
+ Promise.resolve({
+ tabs: [
+ {
+ id: "tu",
+ title: "Has usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ usageStats,
+ },
+ {
+ id: "tn",
+ title: "No usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ usageStats: null,
+ },
+ ],
+ }),
+ });
+ }
+ if (url.endsWith("/status")) {
+ return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) });
+ }
+ if (url.split("?")[0]?.endsWith("/tabs/tu/chunks")) {
+ return Promise.resolve({
+ ok: true,
+ json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }),
+ });
+ }
+ if (url.split("?")[0]?.endsWith("/tabs/tn/chunks")) {
+ return Promise.resolve({
+ ok: true,
+ json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }),
+ });
+ }
+ return Promise.reject(new Error(`unexpected fetch ${url}`));
+ }),
+ );
+
+ const store = createTabStore();
+ const n = await store.hydrateFromBackend();
+ expect(n).toBe(2);
+ // Tab with persisted usage → cacheStats seeded directly from the aggregate.
+ expect(store.tabs.find((t) => t.id === "tu")?.cacheStats).toEqual(usageStats);
+ // Tab with null usageStats → cacheStats stays undefined (no usage yet).
+ expect(store.tabs.find((t) => t.id === "tn")?.cacheStats).toBeUndefined();
+ });
+
+ it("does not re-seed or double-count cacheStats on a statuses reconnect after hydrate", async () => {
+ const usageStats = {
+ inputTokens: 1000,
+ outputTokens: 40,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 900,
+ requests: 1,
+ last: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 },
+ };
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.endsWith("/tabs")) {
+ return Promise.resolve({
+ ok: true,
+ json: () =>
+ Promise.resolve({
+ tabs: [
+ {
+ id: "tr",
+ title: "Reconnect",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ usageStats,
+ },
+ ],
+ }),
+ });
+ }
+ if (url.endsWith("/status")) {
+ return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) });
+ }
+ if (url.split("?")[0]?.endsWith("/tabs/tr/chunks")) {
+ return Promise.resolve({
+ ok: true,
+ json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }),
+ });
+ }
+ return Promise.reject(new Error(`unexpected fetch ${url}`));
+ }),
+ );
+
+ const store = createTabStore();
+ await store.hydrateFromBackend();
+ expect(store.tabs.find((t) => t.id === "tr")?.cacheStats).toEqual(usageStats);
+
+ // A WS reconnect snapshot must NOT touch cacheStats (in-session live
+ // `usage` events own the running totals; the aggregate seed is reload-only).
+ store.handleEvent({ type: "statuses", statuses: { tr: { status: "idle" } } });
+ expect(store.tabs.find((t) => t.id === "tr")?.cacheStats).toEqual(usageStats);
+ });
+
+ it("keeps accumulating live usage events after a hydrate seed", async () => {
+ const usageStats = {
+ inputTokens: 1000,
+ outputTokens: 40,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 900,
+ requests: 1,
+ last: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 },
+ };
+ vi.stubGlobal(
+ "fetch",
+ vi.fn((url: string) => {
+ if (url.endsWith("/tabs")) {
+ return Promise.resolve({
+ ok: true,
+ json: () =>
+ Promise.resolve({
+ tabs: [
+ {
+ id: "tl",
+ title: "Live after hydrate",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ usageStats,
+ },
+ ],
+ }),
+ });
+ }
+ if (url.endsWith("/status")) {
+ return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) });
+ }
+ if (url.split("?")[0]?.endsWith("/tabs/tl/chunks")) {
+ return Promise.resolve({
+ ok: true,
+ json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }),
+ });
+ }
+ return Promise.reject(new Error(`unexpected fetch ${url}`));
+ }),
+ );
+
+ const store = createTabStore();
+ await store.hydrateFromBackend();
+
+ // A new in-session usage event folds ON TOP of the seeded aggregate.
+ store.handleEvent({
+ type: "usage",
+ tabId: "tl",
+ usage: { inputTokens: 200, outputTokens: 10, cacheReadTokens: 150, cacheWriteTokens: 0 },
+ });
+ const stats = store.tabs.find((t) => t.id === "tl")?.cacheStats;
+ expect(stats?.requests).toBe(2);
+ expect(stats?.inputTokens).toBe(1200);
+ expect(stats?.outputTokens).toBe(50);
+ expect(stats?.cacheReadTokens).toBe(150);
+ expect(stats?.cacheWriteTokens).toBe(900);
+ expect(stats?.last).toEqual({
+ inputTokens: 200,
+ outputTokens: 10,
+ cacheReadTokens: 150,
+ cacheWriteTokens: 0,
+ });
+ });
});
// ─── statuses WS event with the wider TabStatusSnapshot shape ───