diff options
| author | Adam Malczewski <[email protected]> | 2026-06-02 13:34:33 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-02 13:34:33 +0900 |
| commit | 48c120e5cd400b2e2b8afae0afcc7c8bc4d2ccb4 (patch) | |
| tree | 2c434aeba0db7d6ec5b87e2f7fe2c81352f0888c | |
| parent | b734eb96bf0af267fdfbef85df51940ca0b4e8c7 (diff) | |
| download | dispatch-48c120e5cd400b2e2b8afae0afcc7c8bc4d2ccb4.tar.gz dispatch-48c120e5cd400b2e2b8afae0afcc7c8bc4d2ccb4.zip | |
fix: reconcile live cacheStats to DB truth on turn-sealed
Addresses the live-accumulator overshoot a Gemini review surfaced: the
frontend adds every streamed usage event to cacheStats, but a rate-limited
fallback attempt's usage is discarded server-side (never persisted). Live
numbers overshot until a reload re-seeded from the DB aggregate.
Fix: turn-sealed (emitted AFTER the atomic usage-row write) now carries the
authoritative getUsageStatsForTab aggregate. The store REPLACES (not adds)
cacheStats with it every turn — landing the just-sealed turn's usage AND
self-healing any live drift, including the discarded-fallback overshoot. No
extra round-trip (piggybacks turn-sealed); idempotent in the happy path.
- core: add UsageStats type; getUsageStatsForTab returns it; turn-sealed gains
optional usageStats field.
- api: agent-manager reads getUsageStatsForTab post-flush and attaches it to
the turn-sealed emit (try/catch: omit on DB error).
- frontend: turn-sealed handler replaces cacheStats (undefined ⇒ untouched
back-compat; null ⇒ clear).
Tests: frontend reconcile/self-heal/back-compat/null-clear; api turn-sealed
carries aggregate. 509 -> 514 passing; typecheck + biome green.
| -rw-r--r-- | packages/api/src/agent-manager.ts | 13 | ||||
| -rw-r--r-- | packages/api/tests/agent-manager.test.ts | 39 | ||||
| -rw-r--r-- | packages/core/src/db/chunks.ts | 23 | ||||
| -rw-r--r-- | packages/core/src/types/index.ts | 33 | ||||
| -rw-r--r-- | packages/frontend/src/lib/tabs.svelte.ts | 18 | ||||
| -rw-r--r-- | packages/frontend/src/lib/types.ts | 7 | ||||
| -rw-r--r-- | packages/frontend/tests/chat-store.test.ts | 81 |
7 files changed, 193 insertions, 21 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 1db9a04..9d7300a 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -35,6 +35,7 @@ import { getMessagesForTab, getSetting, getTab, + getUsageStatsForTab, listOpenTabs, loadAgent, loadAgents, @@ -55,6 +56,7 @@ import { toAvailableSubagents, toAvailableUserAgents, type UsageData, + type UsageStats, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -1639,7 +1641,16 @@ export class AgentManager { // above). Signal the frontend that the turn's rows — with real seqs — are // durable so it can fold its live representation into the sealed log. // Emitted AFTER status:idle/error (which fire before the DB write). - this.emit({ type: "turn-sealed", turnId }, tabId); + // Carry the authoritative usage aggregate (read AFTER the usage rows were + // persisted) so the frontend reconciles its live cacheStats to the DB truth + // — self-healing the live overshoot from a discarded rate-limited attempt. + let usageStats: UsageStats | null = null; + try { + usageStats = getUsageStatsForTab(tabId); + } catch { + // DB read failed — omit reconciliation rather than crash the turn. + } + this.emit({ type: "turn-sealed", turnId, usageStats }, tabId); // Turn fully settled — clear the shared turn id. tabAgent.currentTurnId = null; diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index 95fb558..6d7d66f 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -104,6 +104,13 @@ function resetAppendChunksCalls(): void { appendChunksCalls.length = 0; } +// Seedable return value for the mocked getUsageStatsForTab — what the backend +// reads (post-write) to attach to the `turn-sealed` event. +const fakeUsageStatsByTab = new Map<string, unknown>(); +function resetFakeUsageStats(): void { + fakeUsageStatsByTab.clear(); +} + // Allow tests to swap in a custom `run` generator (e.g. to simulate // a fallback failure mid-stream). Returning to undefined restores // the default. @@ -371,6 +378,9 @@ vi.mock("@dispatch/core", () => ({ getMessagesForTab(tabId: string) { return fakeMessagesByTab.get(tabId) ?? []; }, + getUsageStatsForTab(tabId: string) { + return fakeUsageStatsByTab.get(tabId) ?? null; + }, appendEventToChunks: appendEventToChunksSpy, applySystemEvent(_messages: unknown[], _event: unknown) { return { messageId: "mock-system-msg" }; @@ -421,6 +431,7 @@ describe("AgentManager", () => { setRunImpl(null); appendEventToChunksSpy.mockClear(); resetAppendChunksCalls(); + resetFakeUsageStats(); }); it("initial status is idle", () => { @@ -1402,6 +1413,34 @@ describe("AgentManager", () => { }); }); + it("attaches the DB usage aggregate to the turn-sealed event for live reconciliation", async () => { + const manager = new AgentManager(); + const aggregate = { + inputTokens: 222, + outputTokens: 22, + cacheReadTokens: 100, + cacheWriteTokens: 5, + requests: 1, + last: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 }, + }; + fakeUsageStatsByTab.set("tab-sealed-usage", aggregate); + + const events: AgentEvent[] = []; + manager.onEvent((event) => { + events.push(event); + }); + + await manager.processMessage("tab-sealed-usage", "go"); + + const sealed = events.find((e) => e.type === "turn-sealed") as + | Extract<AgentEvent, { type: "turn-sealed" }> + | undefined; + expect(sealed).toBeDefined(); + // The aggregate read AFTER the write is carried on the event so the + // frontend can REPLACE its live cacheStats with the DB truth. + expect(sealed?.usageStats).toEqual(aggregate); + }); + it("emits usage rows in the SAME appendChunks call as the turn's content (one atomic write)", async () => { const manager = new AgentManager(); setRunImpl(async function* () { diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts index 509cbbd..e0aadf3 100644 --- a/packages/core/src/db/chunks.ts +++ b/packages/core/src/db/chunks.ts @@ -5,7 +5,14 @@ import { groupRowsToMessages, type MessageRow, } from "../chunks/transform.js"; -import type { ChunkData, ChunkRow, ChunkRowDraft, TextData, UsageData } from "../types/index.js"; +import type { + ChunkData, + ChunkRow, + ChunkRowDraft, + TextData, + UsageData, + UsageStats, +} from "../types/index.js"; import { getDatabase } from "./index.js"; // Re-export the DB-free transforms so existing barrel consumers @@ -173,19 +180,7 @@ export function getTotalChunkCount(tabId: string): number { * Sums in JS after selecting the rows (mirroring `mapRow`) to avoid relying on * `json_extract` over the freeform `data_json`. */ -export function getUsageStatsForTab(tabId: string): { - inputTokens: number; - outputTokens: number; - cacheReadTokens: number; - cacheWriteTokens: number; - requests: number; - last: { - inputTokens: number; - outputTokens: number; - cacheReadTokens: number; - cacheWriteTokens: number; - } | null; -} | null { +export function getUsageStatsForTab(tabId: string): UsageStats | null { const db = getDatabase(); const rows = db .query("SELECT data_json FROM chunks WHERE tab_id = $tabId AND type = 'usage' ORDER BY seq ASC") diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index abade27..a1fd2a8 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -143,6 +143,29 @@ export interface UsageData { cacheWriteTokens: number; } +/** + * Aggregate per-tab usage telemetry: the cumulative sum across ALL persisted + * `usage` rows, the request count, and the most recent request's split. This is + * the server-side source of truth (complete regardless of frontend + * eviction/pagination) returned by `getUsageStatsForTab`. Structurally + * identical to the frontend `CacheStats` so it can seed it directly. `null` when + * the tab has no usage rows. + */ +export interface UsageStats { + inputTokens: number; + outputTokens: number; + cacheReadTokens: number; + cacheWriteTokens: number; + /** Number of LLM requests (usage rows) counted. */ + requests: number; + last: { + inputTokens: number; + outputTokens: number; + cacheReadTokens: number; + cacheWriteTokens: number; + } | null; +} + export type ChunkData = | TextData | ThinkingData @@ -249,8 +272,16 @@ export type AgentEvent = * fold its transient live representation into the sealed chunk log. Emitted * after `status: idle`/`error` (which fire before the DB write). Display/sync * only — not conversation content. + * + * Carries `usageStats`: the tab's authoritative usage aggregate read from the + * DB AFTER the turn's usage rows were written. The frontend REPLACES (not adds) + * its live `cacheStats` with this, reconciling the live accumulator to the + * persisted truth every turn. This self-heals the live overshoot that occurs + * when a rate-limited fallback attempt's usage is streamed live but then + * discarded server-side (never persisted). `null` ⇒ tab has no usage rows; + * absent ⇒ leave `cacheStats` untouched (back-compat). */ - | { type: "turn-sealed"; turnId: string } + | { type: "turn-sealed"; turnId: string; usageStats?: UsageStats | null } | { type: "text-delta"; delta: string } | { type: "reasoning-delta"; delta: string } /** diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts index 54f17d9..65e35a8 100644 --- a/packages/frontend/src/lib/tabs.svelte.ts +++ b/packages/frontend/src/lib/tabs.svelte.ts @@ -756,10 +756,11 @@ export function createTabStore() { modelId?: string | null; parentTabId?: string | null; // Backend usage aggregate (GET /tabs). Structurally identical to - // CacheStats, so it seeds `cacheStats` directly on reload. Seeding - // happens ONLY here (hydrate runs when tabs.length === 0, i.e. a true - // reload) — never on `statuses` reconnect or `turn-sealed` — so the - // persisted aggregate and in-session live `usage` events never overlap. + // CacheStats, so it seeds `cacheStats` directly on reload. This is the + // initial seed (hydrate runs only when tabs.length === 0, i.e. a true + // reload); thereafter `turn-sealed` REPLACES cacheStats with the same + // aggregate each turn, keeping the live accumulator reconciled to the DB + // truth. Neither path ADDS to live events, so there is no double-count. usageStats?: CacheStats | null; }> = []; try { @@ -934,6 +935,15 @@ export function createTabStore() { // tail into the sealed chunk log (refetch real seqs), preserving any // newer in-flight turn. Deferred while scrolled up. reconcileSealedTurn(tabId, event.turnId); + // Reconcile cacheStats to the DB source-of-truth carried on the event. + // REPLACE (not add): the aggregate already includes every persisted + // usage row for this tab, so this both lands the just-sealed turn's + // usage AND self-heals any live overshoot (e.g. a rate-limited + // fallback attempt streamed usage live but was discarded server-side). + // `usageStats === undefined` (older backend) leaves cacheStats as-is. + if (event.usageStats !== undefined) { + updateTab(tabId, { cacheStats: event.usageStats ?? undefined }); + } break; } case "statuses": { diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts index 285b4d2..173f68c 100644 --- a/packages/frontend/src/lib/types.ts +++ b/packages/frontend/src/lib/types.ts @@ -140,7 +140,12 @@ export type AgentEvent = | { type: "turn-start"; turnId: string } // Fires after the turn settled AND its chunks were persisted (after the DB // write, post status:idle). Triggers the frontend's reconcile-from-DB. - | { type: "turn-sealed"; turnId: string } + // `usageStats` carries the tab's authoritative usage aggregate (read after the + // usage rows were persisted); the store REPLACES `cacheStats` with it, + // reconciling the live accumulator to the DB truth (self-heals the live + // overshoot from a discarded rate-limited fallback attempt). null ⇒ no usage + // rows; absent ⇒ leave cacheStats untouched. + | { type: "turn-sealed"; turnId: string; usageStats?: CacheStats | null } // Sent on every WS (re)connect: a snapshot of every tab the backend is // currently tracking and its live status. The frontend uses this to // detect desync after a reconnect (e.g. bun --watch restart killed the diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts index 4e9691f..dc2783d 100644 --- a/packages/frontend/tests/chat-store.test.ts +++ b/packages/frontend/tests/chat-store.test.ts @@ -1306,6 +1306,87 @@ describe("tabStore — cache rate (usage events)", () => { }); expect(store.tabs[0]?.cacheStats).toBeUndefined(); }); + + it("turn-sealed REPLACES cacheStats with the carried DB aggregate (reconcile to truth)", async () => { + const { store, tabId } = await setupStoreWithTab(); + // Live events accumulate during the turn. + store.handleEvent({ + type: "usage", + tabId, + usage: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 }, + }); + expect(store.tabs.find((t) => t.id === tabId)?.cacheStats?.inputTokens).toBe(1000); + + // turn-sealed carries the authoritative aggregate → cacheStats is REPLACED. + const aggregate = { + inputTokens: 1000, + outputTokens: 40, + cacheReadTokens: 0, + cacheWriteTokens: 900, + requests: 1, + last: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 }, + }; + store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId, usageStats: aggregate }); + expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toEqual(aggregate); + }); + + it("turn-sealed self-heals a live overshoot from a discarded fallback attempt", async () => { + const { store, tabId } = await setupStoreWithTab(); + // Attempt 1 streamed usage live (overshoot), then rate-limited & discarded + // server-side; attempt 2's usage also streamed live. Live = sum of BOTH. + store.handleEvent({ + type: "usage", + tabId, + usage: { inputTokens: 999, outputTokens: 9, cacheReadTokens: 0, cacheWriteTokens: 0 }, + }); + store.handleEvent({ + type: "usage", + tabId, + usage: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 }, + }); + const overshoot = store.tabs.find((t) => t.id === tabId)?.cacheStats; + expect(overshoot?.requests).toBe(2); + expect(overshoot?.inputTokens).toBe(1221); // inflated: includes discarded attempt + + // The DB only persisted attempt 2 (the survivor). turn-sealed reconciles. + const persisted = { + inputTokens: 222, + outputTokens: 22, + cacheReadTokens: 100, + cacheWriteTokens: 5, + requests: 1, + last: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 }, + }; + store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId, usageStats: persisted }); + // Overshoot healed: cacheStats now matches the DB truth exactly. + expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toEqual(persisted); + }); + + it("turn-sealed without usageStats leaves cacheStats untouched (back-compat)", async () => { + const { store, tabId } = await setupStoreWithTab(); + store.handleEvent({ + type: "usage", + tabId, + usage: { inputTokens: 500, outputTokens: 5, cacheReadTokens: 0, cacheWriteTokens: 0 }, + }); + const before = store.tabs.find((t) => t.id === tabId)?.cacheStats; + // Older backend: turn-sealed carries no usageStats field. + store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId }); + expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toEqual(before); + }); + + it("turn-sealed with usageStats: null clears cacheStats", async () => { + const { store, tabId } = await setupStoreWithTab(); + store.handleEvent({ + type: "usage", + tabId, + usage: { inputTokens: 500, outputTokens: 5, cacheReadTokens: 0, cacheWriteTokens: 0 }, + }); + expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toBeDefined(); + // A null aggregate (no persisted usage rows) explicitly clears live stats. + store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId, usageStats: null }); + expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toBeUndefined(); + }); }); // ─── chunk-native store: eviction, pagination, reconcile ──────── |
