diff options
| author | Adam Malczewski <[email protected]> | 2026-06-02 13:08:36 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-02 13:08:36 +0900 |
| commit | b734eb96bf0af267fdfbef85df51940ca0b4e8c7 (patch) | |
| tree | 3815dc9e569bb57384f53d9042288ff831f02e74 /packages/api | |
| parent | 3f629a8469fe483243671e1ca15582a111e96541 (diff) | |
| download | dispatch-b734eb96bf0af267fdfbef85df51940ca0b4e8c7.tar.gz dispatch-b734eb96bf0af267fdfbef85df51940ca0b4e8c7.zip | |
feat: persist per-tab token/cache usage across reload
Persist usage as invisible type:"usage" chunk rows (side channel):
- core: add "usage" ChunkType + UsageData; exclude usage rows from
getChunksForTab/getTotalChunkCount; add getUsageStatsForTab aggregate
(exported from barrel); defensive skip in groupRowsToMessages.
- api: agent-manager accumulates per-attempt usageRows and flushes them in
the same atomic appendChunks call as the turn's content (discarded on a
superseded fallback attempt). GET /tabs enriches rows with usageStats.
- frontend: hydrateFromBackend seeds cacheStats from usageStats (reload only;
no re-seed on statuses reconnect, so no double-count with live events).
Tests: core DB-backed usage persistence/aggregate; api usage-row-per-event +
fallback discard; routes GET /tabs usageStats; frontend hydrate seed +
no-double-count + live-accumulation-after-seed. 495 -> 509 passing.
Diffstat (limited to 'packages/api')
| -rw-r--r-- | packages/api/src/agent-manager.ts | 27 | ||||
| -rw-r--r-- | packages/api/src/routes/tabs.ts | 6 | ||||
| -rw-r--r-- | packages/api/tests/agent-manager.test.ts | 190 | ||||
| -rw-r--r-- | packages/api/tests/routes.test.ts | 82 |
4 files changed, 300 insertions, 5 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 5a0ffdf..1db9a04 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -54,6 +54,7 @@ import { TaskList, toAvailableSubagents, toAvailableUserAgents, + type UsageData, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -1477,6 +1478,10 @@ export class AgentManager { // turn (text / thinking / tool-batch / error / system), folded from // the stream via the shared `appendEventToChunks` helper. const chunks: Chunk[] = []; + // Per-attempt usage accumulator. Reset each fallback attempt so a + // superseded (rate-limited) attempt's usage is discarded alongside its + // `chunks`. One `usage` event → one UsageData row. + const usageRows: UsageData[] = []; const assistantId = crypto.randomUUID(); let assistantPersisted = false; tabAgent.currentChunks = chunks; @@ -1487,8 +1492,17 @@ export class AgentManager { // `tool-batch` into separate `tool_call` + `tool_result` rows and // tags every row with `turn_id` + derived `step`. const flushAssistant = (): void => { - if (assistantPersisted || chunks.length === 0) return; - appendChunks(tabId, explodeTurn(turnId, chunks)); + if (assistantPersisted) return; + // Append usage as extra drafts in the SAME appendChunks call as the + // turn's content rows: one atomic write, one fsync, contiguous seqs. + // Usage rows are an invisible side channel (excluded from + // getChunksForTab); `step` is cosmetic for usage (never grouped). + const drafts = explodeTurn(turnId, chunks); + for (const u of usageRows) { + drafts.push({ turnId, step: 0, role: "assistant", type: "usage", data: u }); + } + if (drafts.length === 0) return; + appendChunks(tabId, drafts); assistantPersisted = true; }; @@ -1542,6 +1556,15 @@ export class AgentManager { allOutput += event.delta; } + // Capture per-step usage as a side-channel row to persist with the + // turn (one row per `usage` event). The live `this.emit(event)` + // above still drives in-session accumulation; this is the reload- + // persistence path. `appendEventToChunks` intentionally ignores + // `usage`, so it never becomes message content. + if (event.type === "usage") { + usageRows.push({ ...event.usage }); + } + // Route every content-bearing event through the shared helper. // `appendEventToChunks` ignores lifecycle events (status / done // / task-list-update / tab-created / message-* / etc), so it's diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts index b1e9659..f52ee99 100644 --- a/packages/api/src/routes/tabs.ts +++ b/packages/api/src/routes/tabs.ts @@ -6,6 +6,7 @@ import { getSetting, getTab, getTotalChunkCount, + getUsageStatsForTab, groupRowsToMessages, listOpenTabs, setSetting, @@ -27,7 +28,10 @@ export function setTabsAgentManager( } tabsRoutes.get("/", (c) => { - const tabs = listOpenTabs(); + // Enrich each tab with its persisted usage aggregate so the frontend can + // seed `cacheStats` on reload without an extra round-trip. N small indexed + // queries — fine for tab counts. + const tabs = listOpenTabs().map((t) => ({ ...t, usageStats: getUsageStatsForTab(t.id) })); return c.json({ tabs }); }); diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index b9b4510..95fb558 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -91,6 +91,19 @@ function setFakeSetting(key: string, value: string): void { fakeSettings.set(key, value); } +// Capture every appendChunks(tabId, drafts) call so tests can assert what got +// persisted (e.g. usage side-channel rows). The real explodeTurn is mocked to +// return [], so content drafts are empty here; usage rows are pushed directly +// by processMessage's flushAssistant, making them the visible drafts. +interface AppendChunksCall { + tabId: string; + drafts: Array<{ turnId: string; step: number; role: string; type: string; data: unknown }>; +} +const appendChunksCalls: AppendChunksCall[] = []; +function resetAppendChunksCalls(): void { + appendChunksCalls.length = 0; +} + // Allow tests to swap in a custom `run` generator (e.g. to simulate // a fallback failure mid-stream). Returning to undefined restores // the default. @@ -345,7 +358,8 @@ vi.mock("@dispatch/core", () => ({ getSetting(key: string) { return fakeSettings.get(key) ?? null; }, - appendChunks() { + appendChunks(tabId: string, drafts: AppendChunksCall["drafts"]) { + appendChunksCalls.push({ tabId, drafts: [...drafts] }); return []; }, explodeUserText() { @@ -406,6 +420,7 @@ describe("AgentManager", () => { resetFakeSettings(); setRunImpl(null); appendEventToChunksSpy.mockClear(); + resetAppendChunksCalls(); }); it("initial status is idle", () => { @@ -1331,4 +1346,177 @@ describe("AgentManager", () => { expect(tools).not.toContain("read_tab"); }); }); + + // ─── Usage side-channel persistence ────────────────────────────── + // + // `usage` AgentEvents (one per LLM round-trip) are persisted as invisible + // `type:"usage"` chunk rows so per-tab token/cache telemetry survives a + // reload. They ride the SAME atomic appendChunks call as the turn's content + // rows (one fsync, contiguous seqs). A superseded fallback attempt's usage is + // discarded with its `chunks` (per-attempt accumulator). + describe("usage persistence", () => { + it("writes one usage row per usage event emitted during a turn", async () => { + const manager = new AgentManager(); + setRunImpl(async function* () { + yield { type: "status", status: "running" } as const; + yield { + type: "usage", + usage: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 }, + } as const; + yield { type: "text-delta", delta: "step two" } as const; + yield { + type: "usage", + usage: { + inputTokens: 1200, + outputTokens: 60, + cacheReadTokens: 1000, + cacheWriteTokens: 100, + }, + } as const; + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "step two" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }); + + await manager.processMessage("tab-usage-rows", "go"); + + const usageDrafts = appendChunksCalls + .flatMap((c) => c.drafts) + .filter((d) => d.type === "usage"); + expect(usageDrafts).toHaveLength(2); + // One row per event, role=assistant, step cosmetic (0). + expect(usageDrafts.every((d) => d.role === "assistant" && d.step === 0)).toBe(true); + expect(usageDrafts[0]?.data).toEqual({ + inputTokens: 1000, + outputTokens: 40, + cacheReadTokens: 0, + cacheWriteTokens: 900, + }); + expect(usageDrafts[1]?.data).toEqual({ + inputTokens: 1200, + outputTokens: 60, + cacheReadTokens: 1000, + cacheWriteTokens: 100, + }); + }); + + it("emits usage rows in the SAME appendChunks call as the turn's content (one atomic write)", async () => { + const manager = new AgentManager(); + setRunImpl(async function* () { + yield { type: "status", status: "running" } as const; + yield { type: "text-delta", delta: "hi" } as const; + yield { + type: "usage", + usage: { inputTokens: 5, outputTokens: 1, cacheReadTokens: 2, cacheWriteTokens: 3 }, + } as const; + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "hi" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }); + + await manager.processMessage("tab-usage-atomic", "go"); + + // Exactly one appendChunks call carries the usage draft (the flush). The + // user-message append and any system-row appends carry no usage rows. + const callsWithUsage = appendChunksCalls.filter((c) => + c.drafts.some((d) => d.type === "usage"), + ); + expect(callsWithUsage).toHaveLength(1); + expect(callsWithUsage[0]?.tabId).toBe("tab-usage-atomic"); + }); + + it("discards a superseded (rate-limited) attempt's usage on fallback", async () => { + const manager = new AgentManager(); + // Inject a minimal model registry so the rate-limit fallback path is + // taken (real `processMessage` requires modelRegistry + a resolved + // keyId + a next fallback entry to retry). + const markKeyExhausted = vi.fn(); + ( + manager as unknown as { + modelRegistry: { + getKeys(): Array<{ definition: Record<string, unknown> }>; + markKeyExhausted(): void; + }; + } + ).modelRegistry = { + getKeys: () => [ + { + definition: { + id: "k1", + provider: "openai-compatible", + env: "ENV1", + base_url: "http://x", + }, + }, + { + definition: { + id: "k2", + provider: "openai-compatible", + env: "ENV2", + base_url: "http://y", + }, + }, + ], + markKeyExhausted, + }; + + let attempt = 0; + setRunImpl(async function* () { + attempt++; + yield { type: "status", status: "running" } as const; + if (attempt === 1) { + // Attempt 1 emits usage then rate-limits — its usage must be dropped. + yield { + type: "usage", + usage: { inputTokens: 999, outputTokens: 9, cacheReadTokens: 0, cacheWriteTokens: 0 }, + } as const; + yield { type: "error", error: "rate limit exceeded (status=429)" } as const; + return; + } + // Attempt 2 succeeds — only its usage should persist. + yield { + type: "usage", + usage: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 }, + } as const; + yield { + type: "done", + message: { role: "assistant", chunks: [{ type: "text", text: "recovered" }] }, + } as const; + yield { type: "status", status: "idle" } as const; + }); + + const agentModels = [ + { key_id: "k1", model_id: "m1" }, + { key_id: "k2", model_id: "m2" }, + ]; + await manager.processMessage( + "tab-usage-fallback", + "go", + undefined, + undefined, + undefined, + undefined, + agentModels, + ); + + expect(attempt).toBe(2); // confirm the fallback retry actually happened + expect(markKeyExhausted).toHaveBeenCalled(); + + const usageDrafts = appendChunksCalls + .flatMap((c) => c.drafts) + .filter((d) => d.type === "usage"); + // Only attempt 2's usage survives. + expect(usageDrafts).toHaveLength(1); + expect(usageDrafts[0]?.data).toEqual({ + inputTokens: 222, + outputTokens: 22, + cacheReadTokens: 100, + cacheWriteTokens: 5, + }); + }); + }); }); diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index c768cee..ad6d5b1 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -1,6 +1,24 @@ import type { ToolDefinition } from "@dispatch/core"; import { describe, expect, it, vi } from "vitest"; +// Seedable backing stores for the tabs route (GET /tabs enrichment). Declared +// before vi.mock so the hoisted factory closure can reference them; populated +// per-test. +interface FakeOpenTab { + id: string; + title: string; + keyId: string | null; + modelId: string | null; + parentTabId: string | null; + status: string; + isOpen: boolean; + position: number; + createdAt: number; + updatedAt: number; +} +const fakeOpenTabs: FakeOpenTab[] = []; +const fakeUsageStats = new Map<string, unknown>(); + // Mock @dispatch/core's Agent to avoid real LLM calls vi.mock("@dispatch/core", () => ({ Agent: class MockAgent { @@ -170,7 +188,7 @@ vi.mock("@dispatch/core", () => ({ return null; }, listOpenTabs() { - return []; + return [...fakeOpenTabs]; }, resolveTabPrefix() { return { status: "none" }; @@ -230,6 +248,9 @@ vi.mock("@dispatch/core", () => ({ getTotalChunkCount() { return 0; }, + getUsageStatsForTab(tabId: string) { + return fakeUsageStats.get(tabId) ?? null; + }, appendEventToChunks(_chunks: unknown[], _event: unknown) { // no-op stub }, @@ -413,6 +434,65 @@ describe("POST /chat", () => { }); }); +describe("GET /tabs", () => { + it("enriches each open tab with its persisted usageStats aggregate", async () => { + fakeOpenTabs.length = 0; + fakeUsageStats.clear(); + fakeOpenTabs.push({ + id: "tab-u", + title: "Has usage", + keyId: null, + modelId: null, + parentTabId: null, + status: "idle", + isOpen: true, + position: 0, + createdAt: 0, + updatedAt: 0, + }); + fakeOpenTabs.push({ + id: "tab-none", + title: "No usage", + keyId: null, + modelId: null, + parentTabId: null, + status: "idle", + isOpen: true, + position: 1, + createdAt: 0, + updatedAt: 0, + }); + fakeUsageStats.set("tab-u", { + inputTokens: 2200, + outputTokens: 100, + cacheReadTokens: 1000, + cacheWriteTokens: 1000, + requests: 2, + last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 }, + }); + + const res = await app.request("/tabs"); + expect(res.status).toBe(200); + const body = await res.json(); + expect(Array.isArray(body.tabs)).toBe(true); + const tabU = body.tabs.find((t: { id: string }) => t.id === "tab-u"); + const tabNone = body.tabs.find((t: { id: string }) => t.id === "tab-none"); + expect(tabU.usageStats).toEqual({ + inputTokens: 2200, + outputTokens: 100, + cacheReadTokens: 1000, + cacheWriteTokens: 1000, + requests: 2, + last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 }, + }); + // A tab with no usage rows surfaces null (not undefined/missing). + expect(tabNone.usageStats).toBeNull(); + + fakeOpenTabs.length = 0; + fakeUsageStats.clear(); + }); +}); + describe("GET /tabs/:id/chunks", () => { it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => { const res = await app.request("/tabs/tab-x/chunks?limit=50"); |
