From b734eb96bf0af267fdfbef85df51940ca0b4e8c7 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Tue, 2 Jun 2026 13:08:36 +0900 Subject: feat: persist per-tab token/cache usage across reload Persist usage as invisible type:"usage" chunk rows (side channel): - core: add "usage" ChunkType + UsageData; exclude usage rows from getChunksForTab/getTotalChunkCount; add getUsageStatsForTab aggregate (exported from barrel); defensive skip in groupRowsToMessages. - api: agent-manager accumulates per-attempt usageRows and flushes them in the same atomic appendChunks call as the turn's content (discarded on a superseded fallback attempt). GET /tabs enriches rows with usageStats. - frontend: hydrateFromBackend seeds cacheStats from usageStats (reload only; no re-seed on statuses reconnect, so no double-count with live events). Tests: core DB-backed usage persistence/aggregate; api usage-row-per-event + fallback discard; routes GET /tabs usageStats; frontend hydrate seed + no-double-count + live-accumulation-after-seed. 495 -> 509 passing. --- packages/api/src/agent-manager.ts | 27 ++- packages/api/src/routes/tabs.ts | 6 +- packages/api/tests/agent-manager.test.ts | 190 +++++++++++++++- packages/api/tests/routes.test.ts | 82 ++++++- packages/core/src/chunks/transform.ts | 5 + packages/core/src/db/chunks.ts | 90 +++++++- packages/core/src/index.ts | 1 + packages/core/src/types/index.ts | 28 ++- packages/core/tests/db/chunks.db.test.ts | 336 +++++++++++++++++++++++++++++ packages/frontend/src/lib/tabs.svelte.ts | 7 + packages/frontend/tests/chat-store.test.ts | 186 ++++++++++++++++ 11 files changed, 944 insertions(+), 14 deletions(-) create mode 100644 packages/core/tests/db/chunks.db.test.ts diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 5a0ffdf..1db9a04 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -54,6 +54,7 @@ import { TaskList, toAvailableSubagents, toAvailableUserAgents, + type UsageData, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -1477,6 +1478,10 @@ export class AgentManager { // turn (text / thinking / tool-batch / error / system), folded from // the stream via the shared `appendEventToChunks` helper. const chunks: Chunk[] = []; + // Per-attempt usage accumulator. Reset each fallback attempt so a + // superseded (rate-limited) attempt's usage is discarded alongside its + // `chunks`. One `usage` event → one UsageData row. + const usageRows: UsageData[] = []; const assistantId = crypto.randomUUID(); let assistantPersisted = false; tabAgent.currentChunks = chunks; @@ -1487,8 +1492,17 @@ export class AgentManager { // `tool-batch` into separate `tool_call` + `tool_result` rows and // tags every row with `turn_id` + derived `step`. const flushAssistant = (): void => { - if (assistantPersisted || chunks.length === 0) return; - appendChunks(tabId, explodeTurn(turnId, chunks)); + if (assistantPersisted) return; + // Append usage as extra drafts in the SAME appendChunks call as the + // turn's content rows: one atomic write, one fsync, contiguous seqs. + // Usage rows are an invisible side channel (excluded from + // getChunksForTab); `step` is cosmetic for usage (never grouped). + const drafts = explodeTurn(turnId, chunks); + for (const u of usageRows) { + drafts.push({ turnId, step: 0, role: "assistant", type: "usage", data: u }); + } + if (drafts.length === 0) return; + appendChunks(tabId, drafts); assistantPersisted = true; }; @@ -1542,6 +1556,15 @@ export class AgentManager { allOutput += event.delta; } + // Capture per-step usage as a side-channel row to persist with the + // turn (one row per `usage` event). The live `this.emit(event)` + // above still drives in-session accumulation; this is the reload- + // persistence path. `appendEventToChunks` intentionally ignores + // `usage`, so it never becomes message content. + if (event.type === "usage") { + usageRows.push({ ...event.usage }); + } + // Route every content-bearing event through the shared helper. // `appendEventToChunks` ignores lifecycle events (status / done // / task-list-update / tab-created / message-* / etc), so it's diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts index b1e9659..f52ee99 100644 --- a/packages/api/src/routes/tabs.ts +++ b/packages/api/src/routes/tabs.ts @@ -6,6 +6,7 @@ import { getSetting, getTab, getTotalChunkCount, + getUsageStatsForTab, groupRowsToMessages, listOpenTabs, setSetting, @@ -27,7 +28,10 @@ export function setTabsAgentManager( } tabsRoutes.get("/", (c) => { - const tabs = listOpenTabs(); + // Enrich each tab with its persisted usage aggregate so the frontend can + // seed `cacheStats` on reload without an extra round-trip. N small indexed + // queries — fine for tab counts. + const tabs = listOpenTabs().map((t) => ({ ...t, usageStats: getUsageStatsForTab(t.id) })); return c.json({ tabs }); }); diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index b9b4510..95fb558 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -91,6 +91,19 @@ function setFakeSetting(key: string, value: string): void { fakeSettings.set(key, value); } +// Capture every appendChunks(tabId, drafts) call so tests can assert what got +// persisted (e.g. usage side-channel rows). The real explodeTurn is mocked to +// return [], so content drafts are empty here; usage rows are pushed directly +// by processMessage's flushAssistant, making them the visible drafts. +interface AppendChunksCall { + tabId: string; + drafts: Array<{ turnId: string; step: number; role: string; type: string; data: unknown }>; +} +const appendChunksCalls: AppendChunksCall[] = []; +function resetAppendChunksCalls(): void { + appendChunksCalls.length = 0; +} + // Allow tests to swap in a custom `run` generator (e.g. to simulate // a fallback failure mid-stream). Returning to undefined restores // the default. @@ -345,7 +358,8 @@ vi.mock("@dispatch/core", () => ({ getSetting(key: string) { return fakeSettings.get(key) ?? null; }, - appendChunks() { + appendChunks(tabId: string, drafts: AppendChunksCall["drafts"]) { + appendChunksCalls.push({ tabId, drafts: [...drafts] }); return []; }, explodeUserText() { @@ -406,6 +420,7 @@ describe("AgentManager", () => { resetFakeSettings(); setRunImpl(null); appendEventToChunksSpy.mockClear(); + resetAppendChunksCalls(); }); it("initial status is idle", () => { @@ -1331,4 +1346,177 @@ describe("AgentManager", () => { expect(tools).not.toContain("read_tab"); }); }); + + // ─── Usage side-channel persistence ────────────────────────────── + // + // `usage` AgentEvents (one per LLM round-trip) are persisted as invisible + // `type:"usage"` chunk rows so per-tab token/cache telemetry survives a + // reload. They ride the SAME atomic appendChunks call as the turn's content + // rows (one fsync, contiguous seqs). A superseded fallback attempt's usage is + // discarded with its `chunks` (per-attempt accumulator). + describe("usage persistence", () => { + it("writes one usage row per usage event emitted during a turn", async () => { + const manager = new AgentManager(); + setRunImpl(async function* () { + yield { type: "status", status: "running" } as const; + yield { + type: "usage", + usage: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 }, + } as const; + yield { type: "text-delta", delta: "step two" } as const; + yield { + type: "usage", + usage: { + inputTokens: 1200, + outputTokens: 60, + cacheReadTokens: 1000, + cacheWriteTokens: 100, + }, + } as const; + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "step two" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }); + + await manager.processMessage("tab-usage-rows", "go"); + + const usageDrafts = appendChunksCalls + .flatMap((c) => c.drafts) + .filter((d) => d.type === "usage"); + expect(usageDrafts).toHaveLength(2); + // One row per event, role=assistant, step cosmetic (0). + expect(usageDrafts.every((d) => d.role === "assistant" && d.step === 0)).toBe(true); + expect(usageDrafts[0]?.data).toEqual({ + inputTokens: 1000, + outputTokens: 40, + cacheReadTokens: 0, + cacheWriteTokens: 900, + }); + expect(usageDrafts[1]?.data).toEqual({ + inputTokens: 1200, + outputTokens: 60, + cacheReadTokens: 1000, + cacheWriteTokens: 100, + }); + }); + + it("emits usage rows in the SAME appendChunks call as the turn's content (one atomic write)", async () => { + const manager = new AgentManager(); + setRunImpl(async function* () { + yield { type: "status", status: "running" } as const; + yield { type: "text-delta", delta: "hi" } as const; + yield { + type: "usage", + usage: { inputTokens: 5, outputTokens: 1, cacheReadTokens: 2, cacheWriteTokens: 3 }, + } as const; + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "hi" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }); + + await manager.processMessage("tab-usage-atomic", "go"); + + // Exactly one appendChunks call carries the usage draft (the flush). The + // user-message append and any system-row appends carry no usage rows. + const callsWithUsage = appendChunksCalls.filter((c) => + c.drafts.some((d) => d.type === "usage"), + ); + expect(callsWithUsage).toHaveLength(1); + expect(callsWithUsage[0]?.tabId).toBe("tab-usage-atomic"); + }); + + it("discards a superseded (rate-limited) attempt's usage on fallback", async () => { + const manager = new AgentManager(); + // Inject a minimal model registry so the rate-limit fallback path is + // taken (real `processMessage` requires modelRegistry + a resolved + // keyId + a next fallback entry to retry). + const markKeyExhausted = vi.fn(); + ( + manager as unknown as { + modelRegistry: { + getKeys(): Array<{ definition: Record }>; + markKeyExhausted(): void; + }; + } + ).modelRegistry = { + getKeys: () => [ + { + definition: { + id: "k1", + provider: "openai-compatible", + env: "ENV1", + base_url: "http://x", + }, + }, + { + definition: { + id: "k2", + provider: "openai-compatible", + env: "ENV2", + base_url: "http://y", + }, + }, + ], + markKeyExhausted, + }; + + let attempt = 0; + setRunImpl(async function* () { + attempt++; + yield { type: "status", status: "running" } as const; + if (attempt === 1) { + // Attempt 1 emits usage then rate-limits — its usage must be dropped. + yield { + type: "usage", + usage: { inputTokens: 999, outputTokens: 9, cacheReadTokens: 0, cacheWriteTokens: 0 }, + } as const; + yield { type: "error", error: "rate limit exceeded (status=429)" } as const; + return; + } + // Attempt 2 succeeds — only its usage should persist. + yield { + type: "usage", + usage: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 }, + } as const; + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "recovered" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }); + + const agentModels = [ + { key_id: "k1", model_id: "m1" }, + { key_id: "k2", model_id: "m2" }, + ]; + await manager.processMessage( + "tab-usage-fallback", + "go", + undefined, + undefined, + undefined, + undefined, + agentModels, + ); + + expect(attempt).toBe(2); // confirm the fallback retry actually happened + expect(markKeyExhausted).toHaveBeenCalled(); + + const usageDrafts = appendChunksCalls + .flatMap((c) => c.drafts) + .filter((d) => d.type === "usage"); + // Only attempt 2's usage survives. + expect(usageDrafts).toHaveLength(1); + expect(usageDrafts[0]?.data).toEqual({ + inputTokens: 222, + outputTokens: 22, + cacheReadTokens: 100, + cacheWriteTokens: 5, + }); + }); + }); }); diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index c768cee..ad6d5b1 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -1,6 +1,24 @@ import type { ToolDefinition } from "@dispatch/core"; import { describe, expect, it, vi } from "vitest"; +// Seedable backing stores for the tabs route (GET /tabs enrichment). Declared +// before vi.mock so the hoisted factory closure can reference them; populated +// per-test. +interface FakeOpenTab { + id: string; + title: string; + keyId: string | null; + modelId: string | null; + parentTabId: string | null; + status: string; + isOpen: boolean; + position: number; + createdAt: number; + updatedAt: number; +} +const fakeOpenTabs: FakeOpenTab[] = []; +const fakeUsageStats = new Map(); + // Mock @dispatch/core's Agent to avoid real LLM calls vi.mock("@dispatch/core", () => ({ Agent: class MockAgent { @@ -170,7 +188,7 @@ vi.mock("@dispatch/core", () => ({ return null; }, listOpenTabs() { - return []; + return [...fakeOpenTabs]; }, resolveTabPrefix() { return { status: "none" }; @@ -230,6 +248,9 @@ vi.mock("@dispatch/core", () => ({ getTotalChunkCount() { return 0; }, + getUsageStatsForTab(tabId: string) { + return fakeUsageStats.get(tabId) ?? null; + }, appendEventToChunks(_chunks: unknown[], _event: unknown) { // no-op stub }, @@ -413,6 +434,65 @@ describe("POST /chat", () => { }); }); +describe("GET /tabs", () => { + it("enriches each open tab with its persisted usageStats aggregate", async () => { + fakeOpenTabs.length = 0; + fakeUsageStats.clear(); + fakeOpenTabs.push({ + id: "tab-u", + title: "Has usage", + keyId: null, + modelId: null, + parentTabId: null, + status: "idle", + isOpen: true, + position: 0, + createdAt: 0, + updatedAt: 0, + }); + fakeOpenTabs.push({ + id: "tab-none", + title: "No usage", + keyId: null, + modelId: null, + parentTabId: null, + status: "idle", + isOpen: true, + position: 1, + createdAt: 0, + updatedAt: 0, + }); + fakeUsageStats.set("tab-u", { + inputTokens: 2200, + outputTokens: 100, + cacheReadTokens: 1000, + cacheWriteTokens: 1000, + requests: 2, + last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 }, + }); + + const res = await app.request("/tabs"); + expect(res.status).toBe(200); + const body = await res.json(); + expect(Array.isArray(body.tabs)).toBe(true); + const tabU = body.tabs.find((t: { id: string }) => t.id === "tab-u"); + const tabNone = body.tabs.find((t: { id: string }) => t.id === "tab-none"); + expect(tabU.usageStats).toEqual({ + inputTokens: 2200, + outputTokens: 100, + cacheReadTokens: 1000, + cacheWriteTokens: 1000, + requests: 2, + last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 }, + }); + // A tab with no usage rows surfaces null (not undefined/missing). + expect(tabNone.usageStats).toBeNull(); + + fakeOpenTabs.length = 0; + fakeUsageStats.clear(); + }); +}); + describe("GET /tabs/:id/chunks", () => { it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => { const res = await app.request("/tabs/tab-x/chunks?limit=50"); diff --git a/packages/core/src/chunks/transform.ts b/packages/core/src/chunks/transform.ts index a4c6fc8..e8f4a18 100644 --- a/packages/core/src/chunks/transform.ts +++ b/packages/core/src/chunks/transform.ts @@ -209,6 +209,11 @@ export function groupRowsToMessages(rows: ChunkRow[]): MessageRow[] { continue; } + // Usage rows are an invisible side channel (persisted for the backend + // aggregate only). They're already query-excluded from getChunksForTab, + // so this is defensive insurance: never let one leak into render grouping. + if (row.type === "usage") continue; + // assistant / tool rows → part of the current assistant message const c = ensureAssistant(row); switch (row.type) { diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts index 077259d..509cbbd 100644 --- a/packages/core/src/db/chunks.ts +++ b/packages/core/src/db/chunks.ts @@ -5,7 +5,7 @@ import { groupRowsToMessages, type MessageRow, } from "../chunks/transform.js"; -import type { ChunkData, ChunkRow, ChunkRowDraft, TextData } from "../types/index.js"; +import type { ChunkData, ChunkRow, ChunkRowDraft, TextData, UsageData } from "../types/index.js"; import { getDatabase } from "./index.js"; // Re-export the DB-free transforms so existing barrel consumers @@ -101,7 +101,7 @@ export function getChunksForTab( const db = getDatabase(); if (!options) { const rows = db - .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC") + .query("SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC") .all({ $tabId: tabId }) as Array>; return rows.map(mapRow); } @@ -110,24 +110,28 @@ export function getChunksForTab( if (limit !== undefined) { const rows = db .query( - "SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC LIMIT $limit", + "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC LIMIT $limit", ) .all({ $tabId: tabId, $before: before, $limit: limit }) as Array>; return rows.map(mapRow).reverse(); } const rows = db - .query("SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC") + .query( + "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC", + ) .all({ $tabId: tabId, $before: before }) as Array>; return rows.map(mapRow).reverse(); } if (limit !== undefined) { const rows = db - .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq DESC LIMIT $limit") + .query( + "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq DESC LIMIT $limit", + ) .all({ $tabId: tabId, $limit: limit }) as Array>; return rows.map(mapRow).reverse(); } const rows = db - .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC") + .query("SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC") .all({ $tabId: tabId }) as Array>; return rows.map(mapRow); } @@ -145,11 +149,83 @@ export function getMessagesForTab(tabId: string): MessageRow[] { export function getTotalChunkCount(tabId: string): number { const db = getDatabase(); const row = db - .query("SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId") + .query("SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId AND type != 'usage'") .get({ $tabId: tabId }) as { count: number } | null; return row?.count ?? 0; } +/** + * Aggregate per-tab token/cache usage across ALL persisted `usage` chunk rows. + * + * Usage rows are written as an invisible side channel (one row per `usage` + * AgentEvent) and are query-excluded from `getChunksForTab`/`getTotalChunkCount`, + * so this aggregate is the read path. Because it sums server-side over every + * row, it stays complete even after the frontend evicts/pages out old turns + * (eviction is in-memory only). The return shape is structurally identical to + * the frontend `CacheStats`, so reload can seed it directly. + * + * - cumulative `inputTokens`/`outputTokens`/`cacheReadTokens`/`cacheWriteTokens` + * = SUM over all usage rows; + * - `requests` = COUNT of usage rows; + * - `last` = the highest-seq usage row's split (most recent request); + * - `null` when the tab has no usage rows. + * + * Sums in JS after selecting the rows (mirroring `mapRow`) to avoid relying on + * `json_extract` over the freeform `data_json`. + */ +export function getUsageStatsForTab(tabId: string): { + inputTokens: number; + outputTokens: number; + cacheReadTokens: number; + cacheWriteTokens: number; + requests: number; + last: { + inputTokens: number; + outputTokens: number; + cacheReadTokens: number; + cacheWriteTokens: number; + } | null; +} | null { + const db = getDatabase(); + const rows = db + .query("SELECT data_json FROM chunks WHERE tab_id = $tabId AND type = 'usage' ORDER BY seq ASC") + .all({ $tabId: tabId }) as Array<{ data_json: string }>; + if (rows.length === 0) return null; + + let inputTokens = 0; + let outputTokens = 0; + let cacheReadTokens = 0; + let cacheWriteTokens = 0; + let last: UsageData | null = null; + for (const row of rows) { + let u: UsageData; + try { + u = JSON.parse(row.data_json) as UsageData; + } catch { + continue; + } + inputTokens += u.inputTokens ?? 0; + outputTokens += u.outputTokens ?? 0; + cacheReadTokens += u.cacheReadTokens ?? 0; + cacheWriteTokens += u.cacheWriteTokens ?? 0; + last = { + inputTokens: u.inputTokens ?? 0, + outputTokens: u.outputTokens ?? 0, + cacheReadTokens: u.cacheReadTokens ?? 0, + cacheWriteTokens: u.cacheWriteTokens ?? 0, + }; + } + + return { + inputTokens, + outputTokens, + cacheReadTokens, + cacheWriteTokens, + requests: rows.length, + last, + }; +} + export function clearChunksForTab(tabId: string): void { const db = getDatabase(); db.query("DELETE FROM chunks WHERE tab_id = $tabId").run({ $tabId: tabId }); diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 327b0a5..47f9218 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -37,6 +37,7 @@ export { getChunksForTab, getMessagesForTab, getTotalChunkCount, + getUsageStatsForTab, groupRowsToMessages, type MessageRow, } from "./db/chunks.js"; diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index ced3dc2..abade27 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -90,7 +90,14 @@ export interface ChatMessage { export type ChunkRole = "user" | "assistant" | "tool" | "system"; /** Discriminator for a persisted chunk row's payload. */ -export type ChunkType = "text" | "thinking" | "tool_call" | "tool_result" | "error" | "system"; +export type ChunkType = + | "text" + | "thinking" + | "tool_call" + | "tool_result" + | "error" + | "system" + | "usage"; export interface TextData { text: string; @@ -119,6 +126,22 @@ export interface SystemData { kind: SystemChunkKind; text: string; } +/** + * Per-request token usage persisted as a SIDE-CHANNEL chunk row (one row per + * `usage` AgentEvent, i.e. one per LLM round-trip). These rows are deliberately + * EXCLUDED from `getChunksForTab`/`getTotalChunkCount` so they never enter the + * render, pagination, eviction, or agent-history-rebuild paths — they exist + * only to feed the backend aggregate `getUsageStatsForTab`, which seeds the + * frontend's `cacheStats` on reload. `inputTokens` is the TOTAL prompt + * (cached + fresh); `cacheReadTokens`/`cacheWriteTokens` are Anthropic's + * prompt-cache split. Mirrors the `usage` AgentEvent payload. + */ +export interface UsageData { + inputTokens: number; + outputTokens: number; + cacheReadTokens: number; + cacheWriteTokens: number; +} export type ChunkData = | TextData @@ -126,7 +149,8 @@ export type ChunkData = | ToolCallData | ToolResultData | ErrorData - | SystemData; + | SystemData + | UsageData; /** * A persisted chunk row — the append-only unit of conversation storage and diff --git a/packages/core/tests/db/chunks.db.test.ts b/packages/core/tests/db/chunks.db.test.ts new file mode 100644 index 0000000..4f7d517 --- /dev/null +++ b/packages/core/tests/db/chunks.db.test.ts @@ -0,0 +1,336 @@ +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { ChunkRowDraft, UsageData } from "../../src/types/index.js"; + +/** + * Internal row shape — matches the production `chunks` table columns. + * Kept loose at the `query()` boundary to mirror bun:sqlite's dynamic + * return type. + */ +interface ChunkRecord { + id: string; + tab_id: string; + seq: number; + turn_id: string; + step: number; + role: string; + type: string; + data_json: string; + created_at: number; +} + +/** + * In-memory fake of `bun:sqlite`'s Database implementing only the queries + * `chunks.ts` actually issues. Same approach as `tabs.test.ts`: match exact + * normalized query strings as fixed branches (no SQL parser), so a query-string + * change fails loudly as "unsupported" instead of silently returning wrong data. + * + * This lets the DB-backed `getChunksForTab` / `getTotalChunkCount` / + * `getUsageStatsForTab` logic run under vitest, where `bun:sqlite` can't load. + */ +class FakeDatabase { + rows: ChunkRecord[] = []; + private idCounter = 0; + + query(sql: string): { + all: (params?: Record) => unknown[]; + get: (params?: Record) => unknown; + run: (params?: Record) => void; + } { + return { + all: (params) => this.execSelect(sql, params), + get: (params) => this.execSelect(sql, params)[0] ?? null, + run: (params) => { + this.execMutation(sql, params); + }, + }; + } + + /** bun:sqlite's `db.transaction(fn)` returns a callable that runs `fn`. */ + transaction(fn: () => void): () => void { + return () => { + fn(); + }; + } + + private execSelect(sql: string, params?: Record): unknown[] { + const norm = sql.replace(/\s+/g, " ").trim(); + const tabId = params?.$tabId as string | undefined; + const forTab = this.rows.filter((r) => r.tab_id === tabId); + const visible = forTab.filter((r) => r.type !== "usage"); + + // appendChunks: next-seq lookup (counts ALL rows, incl. usage) + if (norm === "SELECT COALESCE(MAX(seq), -1) as max_seq FROM chunks WHERE tab_id = $tabId") { + const seqs = forTab.map((r) => r.seq); + return [{ max_seq: seqs.length > 0 ? Math.max(...seqs) : -1 }]; + } + + // getChunksForTab — no options (usage excluded) + if ( + norm === "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq ASC" + ) { + return [...visible].sort((a, b) => a.seq - b.seq); + } + + // getChunksForTab — before + limit (usage excluded) + if ( + norm === + "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC LIMIT $limit" + ) { + const before = params?.$before as number; + const limit = params?.$limit as number; + return visible + .filter((r) => r.seq < before) + .sort((a, b) => b.seq - a.seq) + .slice(0, limit); + } + + // getChunksForTab — before only (usage excluded) + if ( + norm === + "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' AND seq < $before ORDER BY seq DESC" + ) { + const before = params?.$before as number; + return visible.filter((r) => r.seq < before).sort((a, b) => b.seq - a.seq); + } + + // getChunksForTab — limit only (usage excluded) + if ( + norm === + "SELECT * FROM chunks WHERE tab_id = $tabId AND type != 'usage' ORDER BY seq DESC LIMIT $limit" + ) { + const limit = params?.$limit as number; + return [...visible].sort((a, b) => b.seq - a.seq).slice(0, limit); + } + + // getTotalChunkCount (usage excluded) + if (norm === "SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId AND type != 'usage'") { + return [{ count: visible.length }]; + } + + // getUsageStatsForTab: usage rows only, in seq order + if ( + norm === + "SELECT data_json FROM chunks WHERE tab_id = $tabId AND type = 'usage' ORDER BY seq ASC" + ) { + return forTab + .filter((r) => r.type === "usage") + .sort((a, b) => a.seq - b.seq) + .map((r) => ({ data_json: r.data_json })); + } + + throw new Error(`FakeDatabase: unsupported SELECT: ${norm}`); + } + + private execMutation(sql: string, params?: Record): void { + const norm = sql.replace(/\s+/g, " ").trim(); + + // appendChunks: single-row insert + if ( + norm === + "INSERT INTO chunks (id, tab_id, seq, turn_id, step, role, type, data_json, created_at) VALUES ($id, $tabId, $seq, $turnId, $step, $role, $type, $dataJson, $now)" + ) { + this.rows.push({ + id: (params?.$id as string) ?? `c${this.idCounter++}`, + tab_id: params?.$tabId as string, + seq: params?.$seq as number, + turn_id: params?.$turnId as string, + step: (params?.$step as number) ?? 0, + role: params?.$role as string, + type: params?.$type as string, + data_json: params?.$dataJson as string, + created_at: (params?.$now as number) ?? 0, + }); + return; + } + + throw new Error(`FakeDatabase: unsupported mutation: ${norm}`); + } +} + +let fakeDb: FakeDatabase; + +vi.mock("../../src/db/index.js", () => ({ + getDatabase: vi.fn(() => fakeDb), +})); + +const { appendChunks, getChunksForTab, getTotalChunkCount, getUsageStatsForTab } = await import( + "../../src/db/chunks.js" +); + +function usageDraft(turnId: string, u: UsageData): ChunkRowDraft { + return { turnId, step: 0, role: "assistant", type: "usage", data: u }; +} + +beforeAll(() => { + fakeDb = new FakeDatabase(); +}); + +beforeEach(() => { + fakeDb.rows = []; +}); + +// --------------------------------------------------------------------------- +// usage chunk persistence + side-channel invariants +// --------------------------------------------------------------------------- +describe("usage chunk rows (DB-backed)", () => { + const TAB = "tab-usage"; + + it("persists usage rows alongside content rows with contiguous seqs", () => { + appendChunks(TAB, [ + { turnId: "t1", step: 0, role: "user", type: "text", data: { text: "hi" } }, + { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "yo" } }, + usageDraft("t1", { + inputTokens: 100, + outputTokens: 10, + cacheReadTokens: 0, + cacheWriteTokens: 90, + }), + ]); + // All three rows landed with contiguous seqs. + expect(fakeDb.rows.map((r) => r.seq)).toEqual([0, 1, 2]); + expect(fakeDb.rows.map((r) => r.type)).toEqual(["text", "text", "usage"]); + }); + + it("excludes usage rows from getChunksForTab (all variants)", () => { + appendChunks(TAB, [ + { turnId: "t1", step: 0, role: "user", type: "text", data: { text: "q" } }, + usageDraft("t1", { + inputTokens: 100, + outputTokens: 10, + cacheReadTokens: 0, + cacheWriteTokens: 90, + }), + { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "a" } }, + usageDraft("t1", { + inputTokens: 200, + outputTokens: 20, + cacheReadTokens: 150, + cacheWriteTokens: 0, + }), + ]); + + // no options + const all = getChunksForTab(TAB); + expect(all.every((r) => r.type !== "usage")).toBe(true); + expect(all.map((r) => r.type)).toEqual(["text", "text"]); + + // limit only + const limited = getChunksForTab(TAB, { limit: 10 }); + expect(limited.every((r) => r.type !== "usage")).toBe(true); + expect(limited).toHaveLength(2); + + // before only — `before` is a seq cursor; usage seqs must never surface + const before = getChunksForTab(TAB, { before: 100 }); + expect(before.every((r) => r.type !== "usage")).toBe(true); + expect(before).toHaveLength(2); + + // before + limit + const bl = getChunksForTab(TAB, { before: 100, limit: 10 }); + expect(bl.every((r) => r.type !== "usage")).toBe(true); + expect(bl).toHaveLength(2); + }); + + it("excludes usage rows from getTotalChunkCount", () => { + appendChunks(TAB, [ + { turnId: "t1", step: 0, role: "user", type: "text", data: { text: "q" } }, + { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "a" } }, + usageDraft("t1", { + inputTokens: 100, + outputTokens: 10, + cacheReadTokens: 0, + cacheWriteTokens: 90, + }), + ]); + // 3 rows total, but only 2 visible. + expect(getTotalChunkCount(TAB)).toBe(2); + }); +}); + +// --------------------------------------------------------------------------- +// getUsageStatsForTab — backend aggregate +// --------------------------------------------------------------------------- +describe("getUsageStatsForTab", () => { + const TAB = "tab-agg"; + + it("returns null when the tab has no usage rows", () => { + appendChunks(TAB, [ + { turnId: "t1", step: 0, role: "assistant", type: "text", data: { text: "a" } }, + ]); + expect(getUsageStatsForTab(TAB)).toBeNull(); + }); + + it("sums cumulative tokens, counts requests, and reports the last request's split", () => { + appendChunks(TAB, [ + usageDraft("t1", { + inputTokens: 1000, + outputTokens: 40, + cacheReadTokens: 0, + cacheWriteTokens: 900, + }), + usageDraft("t1", { + inputTokens: 1200, + outputTokens: 60, + cacheReadTokens: 1000, + cacheWriteTokens: 100, + }), + ]); + + const stats = getUsageStatsForTab(TAB); + expect(stats).not.toBeNull(); + expect(stats?.requests).toBe(2); + expect(stats?.inputTokens).toBe(2200); + expect(stats?.outputTokens).toBe(100); + expect(stats?.cacheReadTokens).toBe(1000); + expect(stats?.cacheWriteTokens).toBe(1000); + // `last` = the most recent (highest-seq) usage row. + expect(stats?.last).toEqual({ + inputTokens: 1200, + outputTokens: 60, + cacheReadTokens: 1000, + cacheWriteTokens: 100, + }); + }); + + it("is structurally identical to the frontend CacheStats shape (seeds directly)", () => { + appendChunks(TAB, [ + usageDraft("t1", { + inputTokens: 5, + outputTokens: 1, + cacheReadTokens: 2, + cacheWriteTokens: 3, + }), + ]); + const stats = getUsageStatsForTab(TAB); + expect(Object.keys(stats ?? {}).sort()).toEqual( + [ + "cacheReadTokens", + "cacheWriteTokens", + "inputTokens", + "last", + "outputTokens", + "requests", + ].sort(), + ); + }); + + it("is scoped per tab", () => { + appendChunks("tab-a", [ + usageDraft("t1", { + inputTokens: 10, + outputTokens: 1, + cacheReadTokens: 0, + cacheWriteTokens: 0, + }), + ]); + appendChunks("tab-b", [ + usageDraft("t2", { + inputTokens: 20, + outputTokens: 2, + cacheReadTokens: 0, + cacheWriteTokens: 0, + }), + ]); + expect(getUsageStatsForTab("tab-a")?.inputTokens).toBe(10); + expect(getUsageStatsForTab("tab-b")?.inputTokens).toBe(20); + }); +}); diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts index 3fd7e5f..54f17d9 100644 --- a/packages/frontend/src/lib/tabs.svelte.ts +++ b/packages/frontend/src/lib/tabs.svelte.ts @@ -755,6 +755,12 @@ export function createTabStore() { keyId?: string | null; modelId?: string | null; parentTabId?: string | null; + // Backend usage aggregate (GET /tabs). Structurally identical to + // CacheStats, so it seeds `cacheStats` directly on reload. Seeding + // happens ONLY here (hydrate runs when tabs.length === 0, i.e. a true + // reload) — never on `statuses` reconnect or `turn-sealed` — so the + // persisted aggregate and in-session live `usage` events never overlap. + usageStats?: CacheStats | null; }> = []; try { const res = await fetch(`${config.apiBase}/tabs`); @@ -843,6 +849,7 @@ export function createTabStore() { chunkLimit: appSettings.chunkLimit, oldestLoadedSeq: win.oldestSeq, totalChunks: win.total, + cacheStats: row.usageStats ?? undefined, }; }); diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts index 33a9f69..4e9691f 100644 --- a/packages/frontend/tests/chat-store.test.ts +++ b/packages/frontend/tests/chat-store.test.ts @@ -1005,6 +1005,192 @@ describe("hydrateFromBackend", () => { expect(tC?.renderGroups.length).toBe(0); expect(tC?.agentStatus).toBe("idle"); }); + + // ─── usage persistence: seed cacheStats from the backend aggregate ── + it("seeds cacheStats from a tab's usageStats on hydrate (reload persistence)", async () => { + const usageStats = { + inputTokens: 2200, + outputTokens: 100, + cacheReadTokens: 1000, + cacheWriteTokens: 1000, + requests: 2, + last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 }, + }; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.endsWith("/tabs")) { + return Promise.resolve({ + ok: true, + json: () => + Promise.resolve({ + tabs: [ + { + id: "tu", + title: "Has usage", + keyId: null, + modelId: null, + parentTabId: null, + usageStats, + }, + { + id: "tn", + title: "No usage", + keyId: null, + modelId: null, + parentTabId: null, + usageStats: null, + }, + ], + }), + }); + } + if (url.endsWith("/status")) { + return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) }); + } + if (url.split("?")[0]?.endsWith("/tabs/tu/chunks")) { + return Promise.resolve({ + ok: true, + json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }), + }); + } + if (url.split("?")[0]?.endsWith("/tabs/tn/chunks")) { + return Promise.resolve({ + ok: true, + json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }), + }); + } + return Promise.reject(new Error(`unexpected fetch ${url}`)); + }), + ); + + const store = createTabStore(); + const n = await store.hydrateFromBackend(); + expect(n).toBe(2); + // Tab with persisted usage → cacheStats seeded directly from the aggregate. + expect(store.tabs.find((t) => t.id === "tu")?.cacheStats).toEqual(usageStats); + // Tab with null usageStats → cacheStats stays undefined (no usage yet). + expect(store.tabs.find((t) => t.id === "tn")?.cacheStats).toBeUndefined(); + }); + + it("does not re-seed or double-count cacheStats on a statuses reconnect after hydrate", async () => { + const usageStats = { + inputTokens: 1000, + outputTokens: 40, + cacheReadTokens: 0, + cacheWriteTokens: 900, + requests: 1, + last: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 }, + }; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.endsWith("/tabs")) { + return Promise.resolve({ + ok: true, + json: () => + Promise.resolve({ + tabs: [ + { + id: "tr", + title: "Reconnect", + keyId: null, + modelId: null, + parentTabId: null, + usageStats, + }, + ], + }), + }); + } + if (url.endsWith("/status")) { + return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) }); + } + if (url.split("?")[0]?.endsWith("/tabs/tr/chunks")) { + return Promise.resolve({ + ok: true, + json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }), + }); + } + return Promise.reject(new Error(`unexpected fetch ${url}`)); + }), + ); + + const store = createTabStore(); + await store.hydrateFromBackend(); + expect(store.tabs.find((t) => t.id === "tr")?.cacheStats).toEqual(usageStats); + + // A WS reconnect snapshot must NOT touch cacheStats (in-session live + // `usage` events own the running totals; the aggregate seed is reload-only). + store.handleEvent({ type: "statuses", statuses: { tr: { status: "idle" } } }); + expect(store.tabs.find((t) => t.id === "tr")?.cacheStats).toEqual(usageStats); + }); + + it("keeps accumulating live usage events after a hydrate seed", async () => { + const usageStats = { + inputTokens: 1000, + outputTokens: 40, + cacheReadTokens: 0, + cacheWriteTokens: 900, + requests: 1, + last: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 }, + }; + vi.stubGlobal( + "fetch", + vi.fn((url: string) => { + if (url.endsWith("/tabs")) { + return Promise.resolve({ + ok: true, + json: () => + Promise.resolve({ + tabs: [ + { + id: "tl", + title: "Live after hydrate", + keyId: null, + modelId: null, + parentTabId: null, + usageStats, + }, + ], + }), + }); + } + if (url.endsWith("/status")) { + return Promise.resolve({ ok: true, json: () => Promise.resolve({ statuses: {} }) }); + } + if (url.split("?")[0]?.endsWith("/tabs/tl/chunks")) { + return Promise.resolve({ + ok: true, + json: () => Promise.resolve({ chunks: [], total: 0, oldestSeq: null }), + }); + } + return Promise.reject(new Error(`unexpected fetch ${url}`)); + }), + ); + + const store = createTabStore(); + await store.hydrateFromBackend(); + + // A new in-session usage event folds ON TOP of the seeded aggregate. + store.handleEvent({ + type: "usage", + tabId: "tl", + usage: { inputTokens: 200, outputTokens: 10, cacheReadTokens: 150, cacheWriteTokens: 0 }, + }); + const stats = store.tabs.find((t) => t.id === "tl")?.cacheStats; + expect(stats?.requests).toBe(2); + expect(stats?.inputTokens).toBe(1200); + expect(stats?.outputTokens).toBe(50); + expect(stats?.cacheReadTokens).toBe(150); + expect(stats?.cacheWriteTokens).toBe(900); + expect(stats?.last).toEqual({ + inputTokens: 200, + outputTokens: 10, + cacheReadTokens: 150, + cacheWriteTokens: 0, + }); + }); }); // ─── statuses WS event with the wider TabStatusSnapshot shape ─── -- cgit v1.2.3