summaryrefslogtreecommitdiffhomepage
path: root/packages/api
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-02 13:08:36 +0900
committerAdam Malczewski <[email protected]>2026-06-02 13:08:36 +0900
commitb734eb96bf0af267fdfbef85df51940ca0b4e8c7 (patch)
tree3815dc9e569bb57384f53d9042288ff831f02e74 /packages/api
parent3f629a8469fe483243671e1ca15582a111e96541 (diff)
downloaddispatch-b734eb96bf0af267fdfbef85df51940ca0b4e8c7.tar.gz
dispatch-b734eb96bf0af267fdfbef85df51940ca0b4e8c7.zip
feat: persist per-tab token/cache usage across reload
Persist usage as invisible type:"usage" chunk rows (side channel): - core: add "usage" ChunkType + UsageData; exclude usage rows from getChunksForTab/getTotalChunkCount; add getUsageStatsForTab aggregate (exported from barrel); defensive skip in groupRowsToMessages. - api: agent-manager accumulates per-attempt usageRows and flushes them in the same atomic appendChunks call as the turn's content (discarded on a superseded fallback attempt). GET /tabs enriches rows with usageStats. - frontend: hydrateFromBackend seeds cacheStats from usageStats (reload only; no re-seed on statuses reconnect, so no double-count with live events). Tests: core DB-backed usage persistence/aggregate; api usage-row-per-event + fallback discard; routes GET /tabs usageStats; frontend hydrate seed + no-double-count + live-accumulation-after-seed. 495 -> 509 passing.
Diffstat (limited to 'packages/api')
-rw-r--r--packages/api/src/agent-manager.ts27
-rw-r--r--packages/api/src/routes/tabs.ts6
-rw-r--r--packages/api/tests/agent-manager.test.ts190
-rw-r--r--packages/api/tests/routes.test.ts82
4 files changed, 300 insertions, 5 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index 5a0ffdf..1db9a04 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -54,6 +54,7 @@ import {
TaskList,
toAvailableSubagents,
toAvailableUserAgents,
+ type UsageData,
validateConfig,
} from "@dispatch/core";
import type { PermissionManager } from "./permission-manager.js";
@@ -1477,6 +1478,10 @@ export class AgentManager {
// turn (text / thinking / tool-batch / error / system), folded from
// the stream via the shared `appendEventToChunks` helper.
const chunks: Chunk[] = [];
+ // Per-attempt usage accumulator. Reset each fallback attempt so a
+ // superseded (rate-limited) attempt's usage is discarded alongside its
+ // `chunks`. One `usage` event → one UsageData row.
+ const usageRows: UsageData[] = [];
const assistantId = crypto.randomUUID();
let assistantPersisted = false;
tabAgent.currentChunks = chunks;
@@ -1487,8 +1492,17 @@ export class AgentManager {
// `tool-batch` into separate `tool_call` + `tool_result` rows and
// tags every row with `turn_id` + derived `step`.
const flushAssistant = (): void => {
- if (assistantPersisted || chunks.length === 0) return;
- appendChunks(tabId, explodeTurn(turnId, chunks));
+ if (assistantPersisted) return;
+ // Append usage as extra drafts in the SAME appendChunks call as the
+ // turn's content rows: one atomic write, one fsync, contiguous seqs.
+ // Usage rows are an invisible side channel (excluded from
+ // getChunksForTab); `step` is cosmetic for usage (never grouped).
+ const drafts = explodeTurn(turnId, chunks);
+ for (const u of usageRows) {
+ drafts.push({ turnId, step: 0, role: "assistant", type: "usage", data: u });
+ }
+ if (drafts.length === 0) return;
+ appendChunks(tabId, drafts);
assistantPersisted = true;
};
@@ -1542,6 +1556,15 @@ export class AgentManager {
allOutput += event.delta;
}
+ // Capture per-step usage as a side-channel row to persist with the
+ // turn (one row per `usage` event). The live `this.emit(event)`
+ // above still drives in-session accumulation; this is the reload-
+ // persistence path. `appendEventToChunks` intentionally ignores
+ // `usage`, so it never becomes message content.
+ if (event.type === "usage") {
+ usageRows.push({ ...event.usage });
+ }
+
// Route every content-bearing event through the shared helper.
// `appendEventToChunks` ignores lifecycle events (status / done
// / task-list-update / tab-created / message-* / etc), so it's
diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts
index b1e9659..f52ee99 100644
--- a/packages/api/src/routes/tabs.ts
+++ b/packages/api/src/routes/tabs.ts
@@ -6,6 +6,7 @@ import {
getSetting,
getTab,
getTotalChunkCount,
+ getUsageStatsForTab,
groupRowsToMessages,
listOpenTabs,
setSetting,
@@ -27,7 +28,10 @@ export function setTabsAgentManager(
}
tabsRoutes.get("/", (c) => {
- const tabs = listOpenTabs();
+ // Enrich each tab with its persisted usage aggregate so the frontend can
+ // seed `cacheStats` on reload without an extra round-trip. N small indexed
+ // queries — fine for tab counts.
+ const tabs = listOpenTabs().map((t) => ({ ...t, usageStats: getUsageStatsForTab(t.id) }));
return c.json({ tabs });
});
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index b9b4510..95fb558 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -91,6 +91,19 @@ function setFakeSetting(key: string, value: string): void {
fakeSettings.set(key, value);
}
+// Capture every appendChunks(tabId, drafts) call so tests can assert what got
+// persisted (e.g. usage side-channel rows). The real explodeTurn is mocked to
+// return [], so content drafts are empty here; usage rows are pushed directly
+// by processMessage's flushAssistant, making them the visible drafts.
+interface AppendChunksCall {
+ tabId: string;
+ drafts: Array<{ turnId: string; step: number; role: string; type: string; data: unknown }>;
+}
+const appendChunksCalls: AppendChunksCall[] = [];
+function resetAppendChunksCalls(): void {
+ appendChunksCalls.length = 0;
+}
+
// Allow tests to swap in a custom `run` generator (e.g. to simulate
// a fallback failure mid-stream). Returning to undefined restores
// the default.
@@ -345,7 +358,8 @@ vi.mock("@dispatch/core", () => ({
getSetting(key: string) {
return fakeSettings.get(key) ?? null;
},
- appendChunks() {
+ appendChunks(tabId: string, drafts: AppendChunksCall["drafts"]) {
+ appendChunksCalls.push({ tabId, drafts: [...drafts] });
return [];
},
explodeUserText() {
@@ -406,6 +420,7 @@ describe("AgentManager", () => {
resetFakeSettings();
setRunImpl(null);
appendEventToChunksSpy.mockClear();
+ resetAppendChunksCalls();
});
it("initial status is idle", () => {
@@ -1331,4 +1346,177 @@ describe("AgentManager", () => {
expect(tools).not.toContain("read_tab");
});
});
+
+ // ─── Usage side-channel persistence ──────────────────────────────
+ //
+ // `usage` AgentEvents (one per LLM round-trip) are persisted as invisible
+ // `type:"usage"` chunk rows so per-tab token/cache telemetry survives a
+ // reload. They ride the SAME atomic appendChunks call as the turn's content
+ // rows (one fsync, contiguous seqs). A superseded fallback attempt's usage is
+ // discarded with its `chunks` (per-attempt accumulator).
+ describe("usage persistence", () => {
+ it("writes one usage row per usage event emitted during a turn", async () => {
+ const manager = new AgentManager();
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 },
+ } as const;
+ yield { type: "text-delta", delta: "step two" } as const;
+ yield {
+ type: "usage",
+ usage: {
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "step two" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ await manager.processMessage("tab-usage-rows", "go");
+
+ const usageDrafts = appendChunksCalls
+ .flatMap((c) => c.drafts)
+ .filter((d) => d.type === "usage");
+ expect(usageDrafts).toHaveLength(2);
+ // One row per event, role=assistant, step cosmetic (0).
+ expect(usageDrafts.every((d) => d.role === "assistant" && d.step === 0)).toBe(true);
+ expect(usageDrafts[0]?.data).toEqual({
+ inputTokens: 1000,
+ outputTokens: 40,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 900,
+ });
+ expect(usageDrafts[1]?.data).toEqual({
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ });
+ });
+
+ it("emits usage rows in the SAME appendChunks call as the turn's content (one atomic write)", async () => {
+ const manager = new AgentManager();
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ yield { type: "text-delta", delta: "hi" } as const;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 5, outputTokens: 1, cacheReadTokens: 2, cacheWriteTokens: 3 },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "hi" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ await manager.processMessage("tab-usage-atomic", "go");
+
+ // Exactly one appendChunks call carries the usage draft (the flush). The
+ // user-message append and any system-row appends carry no usage rows.
+ const callsWithUsage = appendChunksCalls.filter((c) =>
+ c.drafts.some((d) => d.type === "usage"),
+ );
+ expect(callsWithUsage).toHaveLength(1);
+ expect(callsWithUsage[0]?.tabId).toBe("tab-usage-atomic");
+ });
+
+ it("discards a superseded (rate-limited) attempt's usage on fallback", async () => {
+ const manager = new AgentManager();
+ // Inject a minimal model registry so the rate-limit fallback path is
+ // taken (real `processMessage` requires modelRegistry + a resolved
+ // keyId + a next fallback entry to retry).
+ const markKeyExhausted = vi.fn();
+ (
+ manager as unknown as {
+ modelRegistry: {
+ getKeys(): Array<{ definition: Record<string, unknown> }>;
+ markKeyExhausted(): void;
+ };
+ }
+ ).modelRegistry = {
+ getKeys: () => [
+ {
+ definition: {
+ id: "k1",
+ provider: "openai-compatible",
+ env: "ENV1",
+ base_url: "http://x",
+ },
+ },
+ {
+ definition: {
+ id: "k2",
+ provider: "openai-compatible",
+ env: "ENV2",
+ base_url: "http://y",
+ },
+ },
+ ],
+ markKeyExhausted,
+ };
+
+ let attempt = 0;
+ setRunImpl(async function* () {
+ attempt++;
+ yield { type: "status", status: "running" } as const;
+ if (attempt === 1) {
+ // Attempt 1 emits usage then rate-limits — its usage must be dropped.
+ yield {
+ type: "usage",
+ usage: { inputTokens: 999, outputTokens: 9, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ } as const;
+ yield { type: "error", error: "rate limit exceeded (status=429)" } as const;
+ return;
+ }
+ // Attempt 2 succeeds — only its usage should persist.
+ yield {
+ type: "usage",
+ usage: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "recovered" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ const agentModels = [
+ { key_id: "k1", model_id: "m1" },
+ { key_id: "k2", model_id: "m2" },
+ ];
+ await manager.processMessage(
+ "tab-usage-fallback",
+ "go",
+ undefined,
+ undefined,
+ undefined,
+ undefined,
+ agentModels,
+ );
+
+ expect(attempt).toBe(2); // confirm the fallback retry actually happened
+ expect(markKeyExhausted).toHaveBeenCalled();
+
+ const usageDrafts = appendChunksCalls
+ .flatMap((c) => c.drafts)
+ .filter((d) => d.type === "usage");
+ // Only attempt 2's usage survives.
+ expect(usageDrafts).toHaveLength(1);
+ expect(usageDrafts[0]?.data).toEqual({
+ inputTokens: 222,
+ outputTokens: 22,
+ cacheReadTokens: 100,
+ cacheWriteTokens: 5,
+ });
+ });
+ });
});
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index c768cee..ad6d5b1 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -1,6 +1,24 @@
import type { ToolDefinition } from "@dispatch/core";
import { describe, expect, it, vi } from "vitest";
+// Seedable backing stores for the tabs route (GET /tabs enrichment). Declared
+// before vi.mock so the hoisted factory closure can reference them; populated
+// per-test.
+interface FakeOpenTab {
+ id: string;
+ title: string;
+ keyId: string | null;
+ modelId: string | null;
+ parentTabId: string | null;
+ status: string;
+ isOpen: boolean;
+ position: number;
+ createdAt: number;
+ updatedAt: number;
+}
+const fakeOpenTabs: FakeOpenTab[] = [];
+const fakeUsageStats = new Map<string, unknown>();
+
// Mock @dispatch/core's Agent to avoid real LLM calls
vi.mock("@dispatch/core", () => ({
Agent: class MockAgent {
@@ -170,7 +188,7 @@ vi.mock("@dispatch/core", () => ({
return null;
},
listOpenTabs() {
- return [];
+ return [...fakeOpenTabs];
},
resolveTabPrefix() {
return { status: "none" };
@@ -230,6 +248,9 @@ vi.mock("@dispatch/core", () => ({
getTotalChunkCount() {
return 0;
},
+ getUsageStatsForTab(tabId: string) {
+ return fakeUsageStats.get(tabId) ?? null;
+ },
appendEventToChunks(_chunks: unknown[], _event: unknown) {
// no-op stub
},
@@ -413,6 +434,65 @@ describe("POST /chat", () => {
});
});
+describe("GET /tabs", () => {
+ it("enriches each open tab with its persisted usageStats aggregate", async () => {
+ fakeOpenTabs.length = 0;
+ fakeUsageStats.clear();
+ fakeOpenTabs.push({
+ id: "tab-u",
+ title: "Has usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ status: "idle",
+ isOpen: true,
+ position: 0,
+ createdAt: 0,
+ updatedAt: 0,
+ });
+ fakeOpenTabs.push({
+ id: "tab-none",
+ title: "No usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ status: "idle",
+ isOpen: true,
+ position: 1,
+ createdAt: 0,
+ updatedAt: 0,
+ });
+ fakeUsageStats.set("tab-u", {
+ inputTokens: 2200,
+ outputTokens: 100,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 1000,
+ requests: 2,
+ last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 },
+ });
+
+ const res = await app.request("/tabs");
+ expect(res.status).toBe(200);
+ const body = await res.json();
+ expect(Array.isArray(body.tabs)).toBe(true);
+ const tabU = body.tabs.find((t: { id: string }) => t.id === "tab-u");
+ const tabNone = body.tabs.find((t: { id: string }) => t.id === "tab-none");
+ expect(tabU.usageStats).toEqual({
+ inputTokens: 2200,
+ outputTokens: 100,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 1000,
+ requests: 2,
+ last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 },
+ });
+ // A tab with no usage rows surfaces null (not undefined/missing).
+ expect(tabNone.usageStats).toBeNull();
+
+ fakeOpenTabs.length = 0;
+ fakeUsageStats.clear();
+ });
+});
+
describe("GET /tabs/:id/chunks", () => {
it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => {
const res = await app.request("/tabs/tab-x/chunks?limit=50");