summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-02 13:34:33 +0900
committerAdam Malczewski <[email protected]>2026-06-02 13:34:33 +0900
commit48c120e5cd400b2e2b8afae0afcc7c8bc4d2ccb4 (patch)
tree2c434aeba0db7d6ec5b87e2f7fe2c81352f0888c
parentb734eb96bf0af267fdfbef85df51940ca0b4e8c7 (diff)
downloaddispatch-48c120e5cd400b2e2b8afae0afcc7c8bc4d2ccb4.tar.gz
dispatch-48c120e5cd400b2e2b8afae0afcc7c8bc4d2ccb4.zip
fix: reconcile live cacheStats to DB truth on turn-sealed
Addresses the live-accumulator overshoot a Gemini review surfaced: the frontend adds every streamed usage event to cacheStats, but a rate-limited fallback attempt's usage is discarded server-side (never persisted). Live numbers overshot until a reload re-seeded from the DB aggregate. Fix: turn-sealed (emitted AFTER the atomic usage-row write) now carries the authoritative getUsageStatsForTab aggregate. The store REPLACES (not adds) cacheStats with it every turn — landing the just-sealed turn's usage AND self-healing any live drift, including the discarded-fallback overshoot. No extra round-trip (piggybacks turn-sealed); idempotent in the happy path. - core: add UsageStats type; getUsageStatsForTab returns it; turn-sealed gains optional usageStats field. - api: agent-manager reads getUsageStatsForTab post-flush and attaches it to the turn-sealed emit (try/catch: omit on DB error). - frontend: turn-sealed handler replaces cacheStats (undefined ⇒ untouched back-compat; null ⇒ clear). Tests: frontend reconcile/self-heal/back-compat/null-clear; api turn-sealed carries aggregate. 509 -> 514 passing; typecheck + biome green.
-rw-r--r--packages/api/src/agent-manager.ts13
-rw-r--r--packages/api/tests/agent-manager.test.ts39
-rw-r--r--packages/core/src/db/chunks.ts23
-rw-r--r--packages/core/src/types/index.ts33
-rw-r--r--packages/frontend/src/lib/tabs.svelte.ts18
-rw-r--r--packages/frontend/src/lib/types.ts7
-rw-r--r--packages/frontend/tests/chat-store.test.ts81
7 files changed, 193 insertions, 21 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index 1db9a04..9d7300a 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -35,6 +35,7 @@ import {
getMessagesForTab,
getSetting,
getTab,
+ getUsageStatsForTab,
listOpenTabs,
loadAgent,
loadAgents,
@@ -55,6 +56,7 @@ import {
toAvailableSubagents,
toAvailableUserAgents,
type UsageData,
+ type UsageStats,
validateConfig,
} from "@dispatch/core";
import type { PermissionManager } from "./permission-manager.js";
@@ -1639,7 +1641,16 @@ export class AgentManager {
// above). Signal the frontend that the turn's rows — with real seqs — are
// durable so it can fold its live representation into the sealed log.
// Emitted AFTER status:idle/error (which fire before the DB write).
- this.emit({ type: "turn-sealed", turnId }, tabId);
+ // Carry the authoritative usage aggregate (read AFTER the usage rows were
+ // persisted) so the frontend reconciles its live cacheStats to the DB truth
+ // — self-healing the live overshoot from a discarded rate-limited attempt.
+ let usageStats: UsageStats | null = null;
+ try {
+ usageStats = getUsageStatsForTab(tabId);
+ } catch {
+ // DB read failed — omit reconciliation rather than crash the turn.
+ }
+ this.emit({ type: "turn-sealed", turnId, usageStats }, tabId);
// Turn fully settled — clear the shared turn id.
tabAgent.currentTurnId = null;
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index 95fb558..6d7d66f 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -104,6 +104,13 @@ function resetAppendChunksCalls(): void {
appendChunksCalls.length = 0;
}
+// Seedable return value for the mocked getUsageStatsForTab — what the backend
+// reads (post-write) to attach to the `turn-sealed` event.
+const fakeUsageStatsByTab = new Map<string, unknown>();
+function resetFakeUsageStats(): void {
+ fakeUsageStatsByTab.clear();
+}
+
// Allow tests to swap in a custom `run` generator (e.g. to simulate
// a fallback failure mid-stream). Returning to undefined restores
// the default.
@@ -371,6 +378,9 @@ vi.mock("@dispatch/core", () => ({
getMessagesForTab(tabId: string) {
return fakeMessagesByTab.get(tabId) ?? [];
},
+ getUsageStatsForTab(tabId: string) {
+ return fakeUsageStatsByTab.get(tabId) ?? null;
+ },
appendEventToChunks: appendEventToChunksSpy,
applySystemEvent(_messages: unknown[], _event: unknown) {
return { messageId: "mock-system-msg" };
@@ -421,6 +431,7 @@ describe("AgentManager", () => {
setRunImpl(null);
appendEventToChunksSpy.mockClear();
resetAppendChunksCalls();
+ resetFakeUsageStats();
});
it("initial status is idle", () => {
@@ -1402,6 +1413,34 @@ describe("AgentManager", () => {
});
});
+ it("attaches the DB usage aggregate to the turn-sealed event for live reconciliation", async () => {
+ const manager = new AgentManager();
+ const aggregate = {
+ inputTokens: 222,
+ outputTokens: 22,
+ cacheReadTokens: 100,
+ cacheWriteTokens: 5,
+ requests: 1,
+ last: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 },
+ };
+ fakeUsageStatsByTab.set("tab-sealed-usage", aggregate);
+
+ const events: AgentEvent[] = [];
+ manager.onEvent((event) => {
+ events.push(event);
+ });
+
+ await manager.processMessage("tab-sealed-usage", "go");
+
+ const sealed = events.find((e) => e.type === "turn-sealed") as
+ | Extract<AgentEvent, { type: "turn-sealed" }>
+ | undefined;
+ expect(sealed).toBeDefined();
+ // The aggregate read AFTER the write is carried on the event so the
+ // frontend can REPLACE its live cacheStats with the DB truth.
+ expect(sealed?.usageStats).toEqual(aggregate);
+ });
+
it("emits usage rows in the SAME appendChunks call as the turn's content (one atomic write)", async () => {
const manager = new AgentManager();
setRunImpl(async function* () {
diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts
index 509cbbd..e0aadf3 100644
--- a/packages/core/src/db/chunks.ts
+++ b/packages/core/src/db/chunks.ts
@@ -5,7 +5,14 @@ import {
groupRowsToMessages,
type MessageRow,
} from "../chunks/transform.js";
-import type { ChunkData, ChunkRow, ChunkRowDraft, TextData, UsageData } from "../types/index.js";
+import type {
+ ChunkData,
+ ChunkRow,
+ ChunkRowDraft,
+ TextData,
+ UsageData,
+ UsageStats,
+} from "../types/index.js";
import { getDatabase } from "./index.js";
// Re-export the DB-free transforms so existing barrel consumers
@@ -173,19 +180,7 @@ export function getTotalChunkCount(tabId: string): number {
* Sums in JS after selecting the rows (mirroring `mapRow`) to avoid relying on
* `json_extract` over the freeform `data_json`.
*/
-export function getUsageStatsForTab(tabId: string): {
- inputTokens: number;
- outputTokens: number;
- cacheReadTokens: number;
- cacheWriteTokens: number;
- requests: number;
- last: {
- inputTokens: number;
- outputTokens: number;
- cacheReadTokens: number;
- cacheWriteTokens: number;
- } | null;
-} | null {
+export function getUsageStatsForTab(tabId: string): UsageStats | null {
const db = getDatabase();
const rows = db
.query("SELECT data_json FROM chunks WHERE tab_id = $tabId AND type = 'usage' ORDER BY seq ASC")
diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts
index abade27..a1fd2a8 100644
--- a/packages/core/src/types/index.ts
+++ b/packages/core/src/types/index.ts
@@ -143,6 +143,29 @@ export interface UsageData {
cacheWriteTokens: number;
}
+/**
+ * Aggregate per-tab usage telemetry: the cumulative sum across ALL persisted
+ * `usage` rows, the request count, and the most recent request's split. This is
+ * the server-side source of truth (complete regardless of frontend
+ * eviction/pagination) returned by `getUsageStatsForTab`. Structurally
+ * identical to the frontend `CacheStats` so it can seed it directly. `null` when
+ * the tab has no usage rows.
+ */
+export interface UsageStats {
+ inputTokens: number;
+ outputTokens: number;
+ cacheReadTokens: number;
+ cacheWriteTokens: number;
+ /** Number of LLM requests (usage rows) counted. */
+ requests: number;
+ last: {
+ inputTokens: number;
+ outputTokens: number;
+ cacheReadTokens: number;
+ cacheWriteTokens: number;
+ } | null;
+}
+
export type ChunkData =
| TextData
| ThinkingData
@@ -249,8 +272,16 @@ export type AgentEvent =
* fold its transient live representation into the sealed chunk log. Emitted
* after `status: idle`/`error` (which fire before the DB write). Display/sync
* only — not conversation content.
+ *
+ * Carries `usageStats`: the tab's authoritative usage aggregate read from the
+ * DB AFTER the turn's usage rows were written. The frontend REPLACES (not adds)
+ * its live `cacheStats` with this, reconciling the live accumulator to the
+ * persisted truth every turn. This self-heals the live overshoot that occurs
+ * when a rate-limited fallback attempt's usage is streamed live but then
+ * discarded server-side (never persisted). `null` ⇒ tab has no usage rows;
+ * absent ⇒ leave `cacheStats` untouched (back-compat).
*/
- | { type: "turn-sealed"; turnId: string }
+ | { type: "turn-sealed"; turnId: string; usageStats?: UsageStats | null }
| { type: "text-delta"; delta: string }
| { type: "reasoning-delta"; delta: string }
/**
diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts
index 54f17d9..65e35a8 100644
--- a/packages/frontend/src/lib/tabs.svelte.ts
+++ b/packages/frontend/src/lib/tabs.svelte.ts
@@ -756,10 +756,11 @@ export function createTabStore() {
modelId?: string | null;
parentTabId?: string | null;
// Backend usage aggregate (GET /tabs). Structurally identical to
- // CacheStats, so it seeds `cacheStats` directly on reload. Seeding
- // happens ONLY here (hydrate runs when tabs.length === 0, i.e. a true
- // reload) — never on `statuses` reconnect or `turn-sealed` — so the
- // persisted aggregate and in-session live `usage` events never overlap.
+ // CacheStats, so it seeds `cacheStats` directly on reload. This is the
+ // initial seed (hydrate runs only when tabs.length === 0, i.e. a true
+ // reload); thereafter `turn-sealed` REPLACES cacheStats with the same
+ // aggregate each turn, keeping the live accumulator reconciled to the DB
+ // truth. Neither path ADDS to live events, so there is no double-count.
usageStats?: CacheStats | null;
}> = [];
try {
@@ -934,6 +935,15 @@ export function createTabStore() {
// tail into the sealed chunk log (refetch real seqs), preserving any
// newer in-flight turn. Deferred while scrolled up.
reconcileSealedTurn(tabId, event.turnId);
+ // Reconcile cacheStats to the DB source-of-truth carried on the event.
+ // REPLACE (not add): the aggregate already includes every persisted
+ // usage row for this tab, so this both lands the just-sealed turn's
+ // usage AND self-heals any live overshoot (e.g. a rate-limited
+ // fallback attempt streamed usage live but was discarded server-side).
+ // `usageStats === undefined` (older backend) leaves cacheStats as-is.
+ if (event.usageStats !== undefined) {
+ updateTab(tabId, { cacheStats: event.usageStats ?? undefined });
+ }
break;
}
case "statuses": {
diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts
index 285b4d2..173f68c 100644
--- a/packages/frontend/src/lib/types.ts
+++ b/packages/frontend/src/lib/types.ts
@@ -140,7 +140,12 @@ export type AgentEvent =
| { type: "turn-start"; turnId: string }
// Fires after the turn settled AND its chunks were persisted (after the DB
// write, post status:idle). Triggers the frontend's reconcile-from-DB.
- | { type: "turn-sealed"; turnId: string }
+ // `usageStats` carries the tab's authoritative usage aggregate (read after the
+ // usage rows were persisted); the store REPLACES `cacheStats` with it,
+ // reconciling the live accumulator to the DB truth (self-heals the live
+ // overshoot from a discarded rate-limited fallback attempt). null ⇒ no usage
+ // rows; absent ⇒ leave cacheStats untouched.
+ | { type: "turn-sealed"; turnId: string; usageStats?: CacheStats | null }
// Sent on every WS (re)connect: a snapshot of every tab the backend is
// currently tracking and its live status. The frontend uses this to
// detect desync after a reconnect (e.g. bun --watch restart killed the
diff --git a/packages/frontend/tests/chat-store.test.ts b/packages/frontend/tests/chat-store.test.ts
index 4e9691f..dc2783d 100644
--- a/packages/frontend/tests/chat-store.test.ts
+++ b/packages/frontend/tests/chat-store.test.ts
@@ -1306,6 +1306,87 @@ describe("tabStore — cache rate (usage events)", () => {
});
expect(store.tabs[0]?.cacheStats).toBeUndefined();
});
+
+ it("turn-sealed REPLACES cacheStats with the carried DB aggregate (reconcile to truth)", async () => {
+ const { store, tabId } = await setupStoreWithTab();
+ // Live events accumulate during the turn.
+ store.handleEvent({
+ type: "usage",
+ tabId,
+ usage: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 },
+ });
+ expect(store.tabs.find((t) => t.id === tabId)?.cacheStats?.inputTokens).toBe(1000);
+
+ // turn-sealed carries the authoritative aggregate → cacheStats is REPLACED.
+ const aggregate = {
+ inputTokens: 1000,
+ outputTokens: 40,
+ cacheReadTokens: 0,
+ cacheWriteTokens: 900,
+ requests: 1,
+ last: { inputTokens: 1000, outputTokens: 40, cacheReadTokens: 0, cacheWriteTokens: 900 },
+ };
+ store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId, usageStats: aggregate });
+ expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toEqual(aggregate);
+ });
+
+ it("turn-sealed self-heals a live overshoot from a discarded fallback attempt", async () => {
+ const { store, tabId } = await setupStoreWithTab();
+ // Attempt 1 streamed usage live (overshoot), then rate-limited & discarded
+ // server-side; attempt 2's usage also streamed live. Live = sum of BOTH.
+ store.handleEvent({
+ type: "usage",
+ tabId,
+ usage: { inputTokens: 999, outputTokens: 9, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ });
+ store.handleEvent({
+ type: "usage",
+ tabId,
+ usage: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 },
+ });
+ const overshoot = store.tabs.find((t) => t.id === tabId)?.cacheStats;
+ expect(overshoot?.requests).toBe(2);
+ expect(overshoot?.inputTokens).toBe(1221); // inflated: includes discarded attempt
+
+ // The DB only persisted attempt 2 (the survivor). turn-sealed reconciles.
+ const persisted = {
+ inputTokens: 222,
+ outputTokens: 22,
+ cacheReadTokens: 100,
+ cacheWriteTokens: 5,
+ requests: 1,
+ last: { inputTokens: 222, outputTokens: 22, cacheReadTokens: 100, cacheWriteTokens: 5 },
+ };
+ store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId, usageStats: persisted });
+ // Overshoot healed: cacheStats now matches the DB truth exactly.
+ expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toEqual(persisted);
+ });
+
+ it("turn-sealed without usageStats leaves cacheStats untouched (back-compat)", async () => {
+ const { store, tabId } = await setupStoreWithTab();
+ store.handleEvent({
+ type: "usage",
+ tabId,
+ usage: { inputTokens: 500, outputTokens: 5, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ });
+ const before = store.tabs.find((t) => t.id === tabId)?.cacheStats;
+ // Older backend: turn-sealed carries no usageStats field.
+ store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId });
+ expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toEqual(before);
+ });
+
+ it("turn-sealed with usageStats: null clears cacheStats", async () => {
+ const { store, tabId } = await setupStoreWithTab();
+ store.handleEvent({
+ type: "usage",
+ tabId,
+ usage: { inputTokens: 500, outputTokens: 5, cacheReadTokens: 0, cacheWriteTokens: 0 },
+ });
+ expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toBeDefined();
+ // A null aggregate (no persisted usage rows) explicitly clears live stats.
+ store.handleEvent({ type: "turn-sealed", turnId: "t1", tabId, usageStats: null });
+ expect(store.tabs.find((t) => t.id === tabId)?.cacheStats).toBeUndefined();
+ });
});
// ─── chunk-native store: eviction, pagination, reconcile ────────