summaryrefslogtreecommitdiffhomepage
path: root/packages/api
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-02 13:57:41 +0900
committerAdam Malczewski <[email protected]>2026-06-02 13:57:41 +0900
commitd27d97bb3aa0c13f4032bab54703ebb9e1c84c81 (patch)
treeb5bcfed65be5a4d27a7bbe6b46e338dcd489c2e0 /packages/api
parent3671b82cc624117476e30b95eaf7d2bc3b34ae28 (diff)
parentc1439ea8c677ddfd11c219de39c3e77c7e297a9b (diff)
downloaddispatch-d27d97bb3aa0c13f4032bab54703ebb9e1c84c81.tar.gz
dispatch-d27d97bb3aa0c13f4032bab54703ebb9e1c84c81.zip
Merge branch 'dev' into u3/agent-effort-level
# Conflicts: # packages/api/tests/agent-manager.test.ts
Diffstat (limited to 'packages/api')
-rw-r--r--packages/api/src/agent-manager.ts40
-rw-r--r--packages/api/src/routes/models.ts16
-rw-r--r--packages/api/src/routes/tabs.ts6
-rw-r--r--packages/api/tests/agent-manager.test.ts229
-rw-r--r--packages/api/tests/routes.test.ts114
5 files changed, 399 insertions, 6 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index d03e696..d339fbd 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -36,6 +36,7 @@ import {
getMessagesForTab,
getSetting,
getTab,
+ getUsageStatsForTab,
listOpenTabs,
loadAgent,
loadAgents,
@@ -56,6 +57,8 @@ import {
TaskList,
toAvailableSubagents,
toAvailableUserAgents,
+ type UsageData,
+ type UsageStats,
validateConfig,
} from "@dispatch/core";
import type { PermissionManager } from "./permission-manager.js";
@@ -1483,6 +1486,10 @@ export class AgentManager {
// turn (text / thinking / tool-batch / error / system), folded from
// the stream via the shared `appendEventToChunks` helper.
const chunks: Chunk[] = [];
+ // Per-attempt usage accumulator. Reset each fallback attempt so a
+ // superseded (rate-limited) attempt's usage is discarded alongside its
+ // `chunks`. One `usage` event → one UsageData row.
+ const usageRows: UsageData[] = [];
const assistantId = crypto.randomUUID();
let assistantPersisted = false;
tabAgent.currentChunks = chunks;
@@ -1493,8 +1500,17 @@ export class AgentManager {
// `tool-batch` into separate `tool_call` + `tool_result` rows and
// tags every row with `turn_id` + derived `step`.
const flushAssistant = (): void => {
- if (assistantPersisted || chunks.length === 0) return;
- appendChunks(tabId, explodeTurn(turnId, chunks));
+ if (assistantPersisted) return;
+ // Append usage as extra drafts in the SAME appendChunks call as the
+ // turn's content rows: one atomic write, one fsync, contiguous seqs.
+ // Usage rows are an invisible side channel (excluded from
+ // getChunksForTab); `step` is cosmetic for usage (never grouped).
+ const drafts = explodeTurn(turnId, chunks);
+ for (const u of usageRows) {
+ drafts.push({ turnId, step: 0, role: "assistant", type: "usage", data: u });
+ }
+ if (drafts.length === 0) return;
+ appendChunks(tabId, drafts);
assistantPersisted = true;
};
@@ -1548,6 +1564,15 @@ export class AgentManager {
allOutput += event.delta;
}
+ // Capture per-step usage as a side-channel row to persist with the
+ // turn (one row per `usage` event). The live `this.emit(event)`
+ // above still drives in-session accumulation; this is the reload-
+ // persistence path. `appendEventToChunks` intentionally ignores
+ // `usage`, so it never becomes message content.
+ if (event.type === "usage") {
+ usageRows.push({ ...event.usage });
+ }
+
// Route every content-bearing event through the shared helper.
// `appendEventToChunks` ignores lifecycle events (status / done
// / task-list-update / tab-created / message-* / etc), so it's
@@ -1622,7 +1647,16 @@ export class AgentManager {
// above). Signal the frontend that the turn's rows — with real seqs — are
// durable so it can fold its live representation into the sealed log.
// Emitted AFTER status:idle/error (which fire before the DB write).
- this.emit({ type: "turn-sealed", turnId }, tabId);
+ // Carry the authoritative usage aggregate (read AFTER the usage rows were
+ // persisted) so the frontend reconciles its live cacheStats to the DB truth
+ // — self-healing the live overshoot from a discarded rate-limited attempt.
+ let usageStats: UsageStats | null = null;
+ try {
+ usageStats = getUsageStatsForTab(tabId);
+ } catch {
+ // DB read failed — omit reconciliation rather than crash the turn.
+ }
+ this.emit({ type: "turn-sealed", turnId, usageStats }, tabId);
// Turn fully settled — clear the shared turn id.
tabAgent.currentTurnId = null;
diff --git a/packages/api/src/routes/models.ts b/packages/api/src/routes/models.ts
index 03c079a..6a0f5dc 100644
--- a/packages/api/src/routes/models.ts
+++ b/packages/api/src/routes/models.ts
@@ -17,6 +17,7 @@ import {
listStoredCredentials,
refreshAccountCredentialsAsync,
resolveApiKey,
+ resolveContextLimit,
setApiKey,
validateAccountCredentials,
} from "@dispatch/core";
@@ -161,6 +162,21 @@ modelsRoutes.get("/available", async (c) => {
return c.json({ models });
});
+// Resolve a model's MAXIMUM context window (in tokens) from the models.dev
+// catalog. Returns `{ contextLimit: number | null }`; `null` means the model's
+// limit is unknown (unsupported provider, unknown model, or catalog offline),
+// which the frontend renders without a denominator/percentage.
+modelsRoutes.get("/context-limit", async (c) => {
+ const provider = c.req.query("provider");
+ const modelId = c.req.query("modelId");
+ if (!provider || !modelId) {
+ return c.json({ error: "provider and modelId query parameters are required" }, 400);
+ }
+
+ const contextLimit = await resolveContextLimit(provider, modelId);
+ return c.json({ contextLimit });
+});
+
// List available Claude accounts with validated credentials
modelsRoutes.get("/claude-accounts", async (c) => {
const candidates = resolveClaudeAccounts();
diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts
index b1e9659..f52ee99 100644
--- a/packages/api/src/routes/tabs.ts
+++ b/packages/api/src/routes/tabs.ts
@@ -6,6 +6,7 @@ import {
getSetting,
getTab,
getTotalChunkCount,
+ getUsageStatsForTab,
groupRowsToMessages,
listOpenTabs,
setSetting,
@@ -27,7 +28,10 @@ export function setTabsAgentManager(
}
tabsRoutes.get("/", (c) => {
- const tabs = listOpenTabs();
+ // Enrich each tab with its persisted usage aggregate so the frontend can
+ // seed `cacheStats` on reload without an extra round-trip. N small indexed
+ // queries — fine for tab counts.
+ const tabs = listOpenTabs().map((t) => ({ ...t, usageStats: getUsageStatsForTab(t.id) }));
return c.json({ tabs });
});
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index ffd87b5..9da6a70 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -98,6 +98,26 @@ function setFakeSetting(key: string, value: string): void {
fakeSettings.set(key, value);
}
+// Capture every appendChunks(tabId, drafts) call so tests can assert what got
+// persisted (e.g. usage side-channel rows). The real explodeTurn is mocked to
+// return [], so content drafts are empty here; usage rows are pushed directly
+// by processMessage's flushAssistant, making them the visible drafts.
+interface AppendChunksCall {
+ tabId: string;
+ drafts: Array<{ turnId: string; step: number; role: string; type: string; data: unknown }>;
+}
+const appendChunksCalls: AppendChunksCall[] = [];
+function resetAppendChunksCalls(): void {
+ appendChunksCalls.length = 0;
+}
+
+// Seedable return value for the mocked getUsageStatsForTab — what the backend
+// reads (post-write) to attach to the `turn-sealed` event.
+const fakeUsageStatsByTab = new Map<string, unknown>();
+function resetFakeUsageStats(): void {
+ fakeUsageStatsByTab.clear();
+}
+
// Allow tests to swap in a custom `run` generator (e.g. to simulate
// a fallback failure mid-stream). Returning to undefined restores
// the default.
@@ -358,7 +378,8 @@ vi.mock("@dispatch/core", () => ({
typeof value === "string" && ["none", "low", "medium", "high", "xhigh", "max"].includes(value)
);
},
- appendChunks() {
+ appendChunks(tabId: string, drafts: AppendChunksCall["drafts"]) {
+ appendChunksCalls.push({ tabId, drafts: [...drafts] });
return [];
},
explodeUserText() {
@@ -370,6 +391,9 @@ vi.mock("@dispatch/core", () => ({
getMessagesForTab(tabId: string) {
return fakeMessagesByTab.get(tabId) ?? [];
},
+ getUsageStatsForTab(tabId: string) {
+ return fakeUsageStatsByTab.get(tabId) ?? null;
+ },
appendEventToChunks: appendEventToChunksSpy,
applySystemEvent(_messages: unknown[], _event: unknown) {
return { messageId: "mock-system-msg" };
@@ -420,6 +444,8 @@ describe("AgentManager", () => {
resetFakeSettings();
setRunImpl(null);
appendEventToChunksSpy.mockClear();
+ resetAppendChunksCalls();
+ resetFakeUsageStats();
});
it("initial status is idle", () => {
@@ -1393,4 +1419,205 @@ describe("AgentManager", () => {
expect(tools).not.toContain("read_tab");
});
});
+
+ // ─── Usage side-channel persistence ──────────────────────────────
+ //
+ // `usage` AgentEvents (one per LLM round-trip) are persisted as invisible
+ // `type:"usage"` chunk rows so per-tab token/cache telemetry survives a
+ // reload. They ride the SAME atomic appendChunks call as the turn's content
+ // rows (one fsync, contiguous seqs). A superseded fallback attempt's usage is
+ // discarded with its `chunks` (per-attempt accumulator).
+ describe("usage persistence", () => {
+ it("writes one usage row per usage event emitted during a turn", async () => {
+ const manager = new AgentManager();
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 },
+ } as const;
+ yield { type: "text-delta", delta: "step two" } as const;
+ yield {
+ type: "usage",
+ usage: {
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "step two" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ await manager.processMessage("tab-usage-rows", "go");
+
+ const usageDrafts = appendChunksCalls
+ .flatMap((c) => c.drafts)
+ .filter((d) => d.type === "usage");
+ expect(usageDrafts).toHaveLength(2);
+ // One row per event, role=assistant, step cosmetic (0).
+ expect(usageDrafts.every((d) => d.role === "assistant" && d.step === 0)).toBe(true);
+ expect(usageDrafts[0]?.data).toEqual({
+ inputTokens: 1000,
+ outputTokens: 40,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 900,
+ });
+ expect(usageDrafts[1]?.data).toEqual({
+ inputTokens: 1200,
+ outputTokens: 60,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 100,
+ });
+ });
+
+ it("attaches the DB usage aggregate to the turn-sealed event for live reconciliation", async () => {
+ const manager = new AgentManager();
+ const aggregate = {
+ inputTokens: 222,
+ outputTokens: 22,
+ cacheReadTokens: 100,
+ cacheWriteTokens: 5,
+ requests: 1,
+ last: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 },
+ };
+ fakeUsageStatsByTab.set("tab-sealed-usage", aggregate);
+
+ const events: AgentEvent[] = [];
+ manager.onEvent((event) => {
+ events.push(event);
+ });
+
+ await manager.processMessage("tab-sealed-usage", "go");
+
+ const sealed = events.find((e) => e.type === "turn-sealed") as
+ | Extract<AgentEvent, { type: "turn-sealed" }>
+ | undefined;
+ expect(sealed).toBeDefined();
+ // The aggregate read AFTER the write is carried on the event so the
+ // frontend can REPLACE its live cacheStats with the DB truth.
+ expect(sealed?.usageStats).toEqual(aggregate);
+ });
+
+ it("emits usage rows in the SAME appendChunks call as the turn's content (one atomic write)", async () => {
+ const manager = new AgentManager();
+ setRunImpl(async function* () {
+ yield { type: "status", status: "running" } as const;
+ yield { type: "text-delta", delta: "hi" } as const;
+ yield {
+ type: "usage",
+ usage: { inputTokens: 5, outputTokens: 1, cacheReadTokens: 2, cacheWriteTokens: 3 },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "hi" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ await manager.processMessage("tab-usage-atomic", "go");
+
+ // Exactly one appendChunks call carries the usage draft (the flush). The
+ // user-message append and any system-row appends carry no usage rows.
+ const callsWithUsage = appendChunksCalls.filter((c) =>
+ c.drafts.some((d) => d.type === "usage"),
+ );
+ expect(callsWithUsage).toHaveLength(1);
+ expect(callsWithUsage[0]?.tabId).toBe("tab-usage-atomic");
+ });
+
+ it("discards a superseded (rate-limited) attempt's usage on fallback", async () => {
+ const manager = new AgentManager();
+ // Inject a minimal model registry so the rate-limit fallback path is
+ // taken (real `processMessage` requires modelRegistry + a resolved
+ // keyId + a next fallback entry to retry).
+ const markKeyExhausted = vi.fn();
+ (
+ manager as unknown as {
+ modelRegistry: {
+ getKeys(): Array<{ definition: Record<string, unknown> }>;
+ markKeyExhausted(): void;
+ };
+ }
+ ).modelRegistry = {
+ getKeys: () => [
+ {
+ definition: {
+ id: "k1",
+ provider: "openai-compatible",
+ env: "ENV1",
+ base_url: "http://x",
+ },
+ },
+ {
+ definition: {
+ id: "k2",
+ provider: "openai-compatible",
+ env: "ENV2",
+ base_url: "http://y",
+ },
+ },
+ ],
+ markKeyExhausted,
+ };
+
+ let attempt = 0;
+ setRunImpl(async function* () {
+ attempt++;
+ yield { type: "status", status: "running" } as const;
+ if (attempt === 1) {
+ // Attempt 1 emits usage then rate-limits — its usage must be dropped.
+ yield {
+ type: "usage",
+ usage: { inputTokens: 999, outputTokens: 9, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ } as const;
+ yield { type: "error", error: "rate limit exceeded (status=429)" } as const;
+ return;
+ }
+ // Attempt 2 succeeds — only its usage should persist.
+ yield {
+ type: "usage",
+ usage: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 },
+ } as const;
+ yield {
+ type: "done",
+ message: { role: "assistant", chunks: [{ type: "text", text: "recovered" }] },
+ } as const;
+ yield { type: "status", status: "idle" } as const;
+ });
+
+ const agentModels = [
+ { key_id: "k1", model_id: "m1" },
+ { key_id: "k2", model_id: "m2" },
+ ];
+ await manager.processMessage(
+ "tab-usage-fallback",
+ "go",
+ undefined,
+ undefined,
+ undefined,
+ undefined,
+ agentModels,
+ );
+
+ expect(attempt).toBe(2); // confirm the fallback retry actually happened
+ expect(markKeyExhausted).toHaveBeenCalled();
+
+ const usageDrafts = appendChunksCalls
+ .flatMap((c) => c.drafts)
+ .filter((d) => d.type === "usage");
+ // Only attempt 2's usage survives.
+ expect(usageDrafts).toHaveLength(1);
+ expect(usageDrafts[0]?.data).toEqual({
+ inputTokens: 222,
+ outputTokens: 22,
+ cacheReadTokens: 100,
+ cacheWriteTokens: 5,
+ });
+ });
+ });
});
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index 3bf446d..c1971b0 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -1,6 +1,24 @@
import type { ToolDefinition } from "@dispatch/core";
import { describe, expect, it, vi } from "vitest";
+// Seedable backing stores for the tabs route (GET /tabs enrichment). Declared
+// before vi.mock so the hoisted factory closure can reference them; populated
+// per-test.
+interface FakeOpenTab {
+ id: string;
+ title: string;
+ keyId: string | null;
+ modelId: string | null;
+ parentTabId: string | null;
+ status: string;
+ isOpen: boolean;
+ position: number;
+ createdAt: number;
+ updatedAt: number;
+}
+const fakeOpenTabs: FakeOpenTab[] = [];
+const fakeUsageStats = new Map<string, unknown>();
+
// Mock @dispatch/core's Agent to avoid real LLM calls
vi.mock("@dispatch/core", () => ({
Agent: class MockAgent {
@@ -175,7 +193,7 @@ vi.mock("@dispatch/core", () => ({
);
},
listOpenTabs() {
- return [];
+ return [...fakeOpenTabs];
},
resolveTabPrefix() {
return { status: "none" };
@@ -235,6 +253,9 @@ vi.mock("@dispatch/core", () => ({
getTotalChunkCount() {
return 0;
},
+ getUsageStatsForTab(tabId: string) {
+ return fakeUsageStats.get(tabId) ?? null;
+ },
appendEventToChunks(_chunks: unknown[], _event: unknown) {
// no-op stub
},
@@ -273,6 +294,13 @@ vi.mock("@dispatch/core", () => ({
execute: async () => "mock",
};
},
+ // ── models.dev context-limit stub ─────────────────────────────
+ resolveContextLimit(provider: string, modelId: string) {
+ if (provider === "anthropic" && modelId === "claude-sonnet-4-5") {
+ return Promise.resolve(200000);
+ }
+ return Promise.resolve(null);
+ },
// ── ntfy notifications stubs ──────────────────────────────────
NotificationDispatcher: class MockNotificationDispatcher {
attachToAgentManager() {
@@ -446,6 +474,65 @@ describe("POST /chat", () => {
});
});
+describe("GET /tabs", () => {
+ it("enriches each open tab with its persisted usageStats aggregate", async () => {
+ fakeOpenTabs.length = 0;
+ fakeUsageStats.clear();
+ fakeOpenTabs.push({
+ id: "tab-u",
+ title: "Has usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ status: "idle",
+ isOpen: true,
+ position: 0,
+ createdAt: 0,
+ updatedAt: 0,
+ });
+ fakeOpenTabs.push({
+ id: "tab-none",
+ title: "No usage",
+ keyId: null,
+ modelId: null,
+ parentTabId: null,
+ status: "idle",
+ isOpen: true,
+ position: 1,
+ createdAt: 0,
+ updatedAt: 0,
+ });
+ fakeUsageStats.set("tab-u", {
+ inputTokens: 2200,
+ outputTokens: 100,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 1000,
+ requests: 2,
+ last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 },
+ });
+
+ const res = await app.request("/tabs");
+ expect(res.status).toBe(200);
+ const body = await res.json();
+ expect(Array.isArray(body.tabs)).toBe(true);
+ const tabU = body.tabs.find((t: { id: string }) => t.id === "tab-u");
+ const tabNone = body.tabs.find((t: { id: string }) => t.id === "tab-none");
+ expect(tabU.usageStats).toEqual({
+ inputTokens: 2200,
+ outputTokens: 100,
+ cacheReadTokens: 1000,
+ cacheWriteTokens: 1000,
+ requests: 2,
+ last: { inputTokens: 1200, outputTokens: 60, cacheReadTokens: 1000, cacheWriteTokens: 100 },
+ });
+ // A tab with no usage rows surfaces null (not undefined/missing).
+ expect(tabNone.usageStats).toBeNull();
+
+ fakeOpenTabs.length = 0;
+ fakeUsageStats.clear();
+ });
+});
+
describe("GET /tabs/:id/chunks", () => {
it("returns the raw chunk window shape { chunks, total, oldestSeq }", async () => {
const res = await app.request("/tabs/tab-x/chunks?limit=50");
@@ -784,3 +871,28 @@ describe("Wake schedule routes", () => {
expect(body.schedule["13"]).toBeUndefined();
});
});
+
+describe("GET /models/context-limit", () => {
+ it("returns the resolved context limit for a known model", async () => {
+ const res = await app.request(
+ "/models/context-limit?provider=anthropic&modelId=claude-sonnet-4-5",
+ );
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { contextLimit: number | null };
+ expect(body.contextLimit).toBe(200000);
+ });
+
+ it("returns null contextLimit for an unknown model", async () => {
+ const res = await app.request("/models/context-limit?provider=anthropic&modelId=mystery");
+ expect(res.status).toBe(200);
+ const body = (await res.json()) as { contextLimit: number | null };
+ expect(body.contextLimit).toBeNull();
+ });
+
+ it("400s when provider or modelId is missing", async () => {
+ const res1 = await app.request("/models/context-limit?provider=anthropic");
+ expect(res1.status).toBe(400);
+ const res2 = await app.request("/models/context-limit?modelId=claude-sonnet-4-5");
+ expect(res2.status).toBe(400);
+ });
+});