diff options
| author | Adam Malczewski <[email protected]> | 2026-05-30 20:06:31 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-05-30 20:06:31 +0900 |
| commit | 0f39b6f78957aacf206012ad2193d9b0c1940c1f (patch) | |
| tree | ff5f2da8b4f3cdf56cf50d44b8fec75a489ad6fe /packages/api/src | |
| parent | 8c58a973b0d021689cebad5c0cc6d56956bbc2f6 (diff) | |
| download | dispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.tar.gz dispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.zip | |
refactor(chunks): append-only chunk log with per-step cache-stable wire
Replace the message-as-container model with a flat, append-only chunk log.
- chunks table (id, tab_id, seq, turn_id, step, role, type, data_json): one
row per chunk; tool_call (assistant) and tool_result (tool) are SEPARATE
rows linked by callId. Message/turn are derived groupings, not stored.
- chunks/transform.ts: DB-free explode (Chunk[] -> rows) / group (rows ->
messages), shared by backend and the browser frontend.
- Cache fix: toModelMessages segments each turn at tool-batch boundaries into
stable [assistant, tool] pairs per step, so earlier steps serialize
byte-identically across requests (kills the prompt-cache churn).
- agent-manager persists a turn's chunks on seal (once), discarding a failed
fallback attempt's partial chunks; rebuilds agent history from the log.
- GET /messages windows the log by chunk seq then groups; loadMoreMessages
merges a turn split across the window boundary by turnId.
- One-shot migration drops the legacy messages table and clears tabs;
settings/credentials/keys/usage preserved.
Full suite green (317 tests); biome, tsc, and svelte-check clean.
Diffstat (limited to 'packages/api/src')
| -rw-r--r-- | packages/api/src/agent-manager.ts | 241 | ||||
| -rw-r--r-- | packages/api/src/routes/tabs.ts | 20 |
2 files changed, 119 insertions, 142 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index c873388..f7975d1 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -3,9 +3,8 @@ import { type AgentEvent, type AgentSkillMapping, type AgentStatus, + appendChunks, appendEventToChunks, - appendMessage, - applySystemEvent, BackgroundShellStore, BackgroundTranscriptStore, type Chunk, @@ -26,6 +25,8 @@ import { createYoutubeTranscribeTool, type DispatchConfig, expandAgentToolNames, + explodeTurn, + explodeUserText, GLOBAL_AGENTS_DIR, getAgentDirPaths, getClaudeAccountsFromDB, @@ -45,7 +46,6 @@ import { type TabStatusSnapshot, TaskList, toAvailableAgents, - updateMessage, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -179,13 +179,23 @@ interface TabAgent { /** Store for transcript requests backgrounded due to user interrupt. */ transcriptStore: BackgroundTranscriptStore; /** - * In-flight assistant message chunks for the active turn. `null` when - * no turn is running. Out-of-band system events (config-reload, - * cancel, etc.) target this list when present. + * In-flight assistant chunks for the active turn. `null` when no turn is + * running. Out-of-band system events (config-reload, cancel, etc.) push + * onto this list when present; it is exploded into chunk rows when the + * turn flushes. */ currentChunks: Chunk[] | null; - /** DB id of the in-flight assistant message (if persisted yet). */ + /** + * Opaque id of the in-flight assistant turn, used as the `currentAssistantId` + * in the WS status snapshot so a reconnecting frontend can align its local + * streaming message. (No longer a DB row id — the turn is many chunk rows.) + */ currentAssistantId: string | null; + /** + * `turn_id` shared by the current turn's user message and assistant chunk + * rows. Set at the start of `processMessage`, cleared when the turn ends. + */ + currentTurnId: string | null; } export class AgentManager { @@ -331,6 +341,7 @@ export class AgentManager { transcriptStore: new BackgroundTranscriptStore(), currentChunks: null, currentAssistantId: null, + currentTurnId: null, }; this.tabAgents.set(tabId, tabAgent); } @@ -716,17 +727,14 @@ export class AgentManager { // 3. Config or skills reload (configWatcher / skillsWatcher // also null out `tabAgent.agent`). // - // Boundary semantics: `processMessage` calls `appendMessage` - // for the current turn's user message BEFORE calling this - // function, so the DB ends in `[..., u_current]`. In the - // fallback retry path (agent-mode automatic model fallback), - // the previous attempt may also have flushed a partial - // assistant response, so the DB ends in - // `[..., u_current, partial_a]`. Either way, we walk - // backwards to the most recent user-role row and load only - // strictly-prior rows: `agent.run()` will push the current - // user message itself at agent.ts:546, so including it here - // would duplicate it. + // Boundary semantics: `processMessage` appends the current turn's + // user message (as a chunk row) BEFORE calling this function, so the + // grouped history ends in `[..., u_current]`. In the fallback retry + // path the previous attempt may also have flushed a partial assistant + // turn, so it can end `[..., u_current, partial_a]`. Either way, we + // walk backwards to the most recent user-role message and load only + // strictly-prior messages: `agent.run()` pushes the current user + // message itself, so including it here would duplicate it. // // `toModelMessages` already filters out `role === "system"` // rows and strips `error` / `system` chunks, so it's safe to @@ -824,54 +832,31 @@ export class AgentManager { } /** - * Persist a system chunk to a tab's message history. + * Persist a system chunk (notice / model-changed / config-reload / + * cancelled) to a tab's history. * * If an assistant turn is in flight (`currentChunks` is non-null), the - * chunk is appended to the in-flight assistant message's chunk list — - * the final `appendMessage` / `updateMessage` call at end-of-turn picks - * it up automatically. + * chunk is folded into the in-flight chunk list; it is exploded into a + * `system` chunk row when the turn flushes. * - * Otherwise we load the tab's persisted messages, run them through - * `applySystemEvent` (which either appends to an existing trailing - * `role: "system"` message or creates a new one), then persist the - * delta via `appendMessage` / `updateMessage`. + * Otherwise we append a standalone `system` chunk row immediately. Adjacent + * system rows are coalesced back into one system message at group time + * (`groupRowsToMessages`). */ private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void { const tabAgent = this.tabAgents.get(tabId); - // Turn in flight → append directly to the in-flight chunk list. - // The chunk lands on the assistant message when it's persisted at - // turn-end (or the assistant message is updated mid-turn elsewhere). + // Turn in flight → fold into the in-flight chunk list; it is exploded + // into chunk rows (including this system chunk) when the turn flushes. if (tabAgent?.currentChunks) { tabAgent.currentChunks.push({ type: "system", kind, text }); - if (tabAgent.currentAssistantId) { - try { - updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); - } catch { - // Best-effort — the final persistence in processMessage will - // flush the same chunks again. - } - } return; } - // No turn in flight → route via applySystemEvent against the - // persisted message list. Either appends to a trailing system - // message or creates a fresh one. + // No turn in flight → persist a standalone system chunk row immediately. try { - const rows = getMessagesForTab(tabId); - const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks })); - const before = messages[messages.length - 1]; - const { messageId } = applySystemEvent(messages, { kind, text }); - const target = messages.find((m) => m.id === messageId); - if (!target) return; - if (before && before.id === messageId) { - // Appended to existing trailing system message. - updateMessage(messageId, JSON.stringify(target.chunks)); - } else { - // Newly created system message. - appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks)); - } + const turnId = tabAgent?.currentTurnId ?? crypto.randomUUID(); + appendChunks(tabId, explodeTurn(turnId, [{ type: "system", kind, text }])); } catch { // DB not available (e.g. tab not yet created) — drop silently. } @@ -880,22 +865,17 @@ export class AgentManager { stopTab(tabId: string): void { const tabAgent = this.tabAgents.get(tabId); if (tabAgent) { - // If a turn is in flight, drop a `cancelled` system chunk into - // the in-flight assistant message so the user sees an explicit - // "Generation cancelled by user" marker at the cancellation point. + // If a turn is in flight, drop a `cancelled` system chunk into the + // in-flight chunk list so the user sees an explicit "Generation + // cancelled by user" marker at the cancellation point. It is + // persisted (as a chunk row) when `processMessage` flushes the + // aborted turn. if (tabAgent.currentChunks) { tabAgent.currentChunks.push({ type: "system", kind: "cancelled", text: "Generation cancelled by user", }); - if (tabAgent.currentAssistantId) { - try { - updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); - } catch { - // best-effort - } - } } tabAgent.abortController?.abort(); tabAgent.status = "idle"; @@ -1134,13 +1114,12 @@ export class AgentManager { tabAgent.status = "running"; this.messageCount += 1; - // Persist user message to DB (once, before any fallback retry) - appendMessage( - tabId, - crypto.randomUUID(), - "user", - JSON.stringify([{ type: "text", text: message }]), - ); + // Persist the user message as a chunk row (once, before any fallback + // retry). The whole turn — this user message plus the assistant's + // chunk rows — shares one `turn_id`. + const turnId = crypto.randomUUID(); + tabAgent.currentTurnId = turnId; + appendChunks(tabId, explodeUserText(turnId, message)); // Store agent models on the tab if provided (defines fallback order) if (agentModels) { @@ -1166,25 +1145,23 @@ export class AgentManager { currentModelId = entry.model_id || undefined; allOutput = ""; - // Single ordered chunk list for the assistant turn — replaces the - // previous (text + toolCalls + thinking) tri-accumulator pattern. - // Persisted progressively (insert on first chunk, update thereafter) - // so out-of-band routes (config-reload, cancel) see real DB rows. + // Single ordered chunk list accumulating this attempt's assistant + // turn (text / thinking / tool-batch / error / system), folded from + // the stream via the shared `appendEventToChunks` helper. const chunks: Chunk[] = []; const assistantId = crypto.randomUUID(); let assistantPersisted = false; tabAgent.currentChunks = chunks; tabAgent.currentAssistantId = assistantId; + // Write-on-seal: explode the accumulated turn into flat chunk rows + // ONCE, when the turn settles. `explodeTurn` splits each step's + // `tool-batch` into separate `tool_call` + `tool_result` rows and + // tags every row with `turn_id` + derived `step`. const flushAssistant = (): void => { - if (chunks.length === 0) return; - const json = JSON.stringify(chunks); - if (!assistantPersisted) { - appendMessage(tabId, assistantId, "assistant", json); - assistantPersisted = true; - } else { - updateMessage(assistantId, json); - } + if (assistantPersisted || chunks.length === 0) return; + appendChunks(tabId, explodeTurn(turnId, chunks)); + assistantPersisted = true; }; let attemptError: string | null = null; @@ -1205,7 +1182,7 @@ export class AgentManager { }); } } catch { - // Best-effort — if this fails, appendMessage will throw and we'll catch it below + // Best-effort — if this fails, chunk persistence will throw and we'll catch it below } for await (const event of agent.run(message, { @@ -1237,23 +1214,12 @@ export class AgentManager { allOutput += event.delta; } - if (event.type === "done") { - // End of turn — flush the accumulated chunks. Reset the - // in-flight pointers so out-of-band system events route - // through `applySystemEvent` against the persisted list - // instead of mutating a stale array. - flushAssistant(); - chunks.length = 0; - assistantPersisted = true; // suppress post-loop flush - tabAgent.currentChunks = null; - tabAgent.currentAssistantId = null; - continue; - } - // Route every content-bearing event through the shared helper. // `appendEventToChunks` ignores lifecycle events (status / done // / task-list-update / tab-created / message-* / etc), so it's - // safe to call unconditionally. + // safe to call unconditionally. Persistence happens once, after + // the loop, so we never write a partial turn that a fallback + // retry would then duplicate. appendEventToChunks(chunks, event); } } catch (err) { @@ -1261,9 +1227,24 @@ export class AgentManager { attemptError = err instanceof Error ? err.message : String(err); } - // Flush any accumulated assistant content from this attempt (covers - // the abort/error/exception paths where we never saw a `done`). - flushAssistant(); + // Decide whether a fallback retry will supersede this attempt. + const isRetryable = + attemptError !== null && + (attemptError.includes("status=429") || + attemptError.toLowerCase().includes("rate limit") || + attemptError.toLowerCase().includes("rate_limit") || + attemptError.toLowerCase().includes("usage limit") || + attemptError.toLowerCase().includes("exhausted")); + const nextEntry = fallbackSequence[fallbackIdx + 1]; + const willRetry = Boolean(isRetryable && this.modelRegistry && tabAgent.keyId && nextEntry); + + // Persist this attempt's turn — unless a retry will replace it, in + // which case the partial (and its error chunk) is discarded so the + // next attempt's chunks don't merge with a failed one. On success, + // abort, or a final error, the turn is flushed exactly once. + if (!willRetry) { + flushAssistant(); + } tabAgent.currentChunks = null; tabAgent.currentAssistantId = null; @@ -1273,43 +1254,27 @@ export class AgentManager { break; } - // Check if error is retryable (rate limit / exhausted key) - const isRetryable = - attemptError.includes("status=429") || - attemptError.toLowerCase().includes("rate limit") || - attemptError.toLowerCase().includes("rate_limit") || - attemptError.toLowerCase().includes("usage limit") || - attemptError.toLowerCase().includes("exhausted"); - - if (isRetryable && this.modelRegistry && tabAgent.keyId) { - this.modelRegistry.markKeyExhausted(tabAgent.keyId, attemptError); - - // Try the next entry in the agent's fallback sequence - const nextIdx = fallbackIdx + 1; - const nextEntry = fallbackSequence[nextIdx]; - if (nextIdx < maxFallbackAttempts && nextEntry) { - const fallbackMsg = - `Key "${tabAgent.keyId}" rate limited. ` + - `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; - console.warn(`[dispatch] ${fallbackMsg}`); - // Persist the notice + model-change as system chunks. We're - // between turns here (just flushed the previous assistant - // message), so the helper routes them into a `role: "system"` - // message via `applySystemEvent`. - this.emit({ type: "notice", message: fallbackMsg }, tabId); - this.routeSystemEventToTab(tabId, "notice", fallbackMsg); - this.emit( - { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, - tabId, - ); - this.routeSystemEventToTab( - tabId, - "model-changed", - `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, - ); - tabAgent.agent = null; - continue; - } + if (willRetry && nextEntry && tabAgent.keyId) { + this.modelRegistry?.markKeyExhausted(tabAgent.keyId, attemptError); + const fallbackMsg = + `Key "${tabAgent.keyId}" rate limited. ` + + `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; + console.warn(`[dispatch] ${fallbackMsg}`); + // Persist the notice + model-change as standalone system chunk + // rows (no turn in flight now — currentChunks was just cleared). + this.emit({ type: "notice", message: fallbackMsg }, tabId); + this.routeSystemEventToTab(tabId, "notice", fallbackMsg); + this.emit( + { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, + tabId, + ); + this.routeSystemEventToTab( + tabId, + "model-changed", + `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, + ); + tabAgent.agent = null; + continue; } // All fallbacks exhausted or non-retryable error @@ -1319,6 +1284,8 @@ export class AgentManager { this.emit({ type: "status", status: "error" }, tabId); break; } + // Turn fully settled — clear the shared turn id. + tabAgent.currentTurnId = null; // Resolve completion promise for child agents if (processError === null) { diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts index afa5735..e9265ec 100644 --- a/packages/api/src/routes/tabs.ts +++ b/packages/api/src/routes/tabs.ts @@ -2,10 +2,11 @@ import { archiveTab, createTab, deleteSetting, - getMessagesForTab, + getChunksForTab, getSetting, getTab, - getTotalMessageCount, + getTotalChunkCount, + groupRowsToMessages, listOpenTabs, setSetting, updateTabModel, @@ -65,6 +66,11 @@ tabsRoutes.get("/:id", (c) => { return c.json(tab); }); +// Conversation history for a tab, paginated at CHUNK granularity. The flat +// chunk log is windowed by `limit`/`before` (both chunk-`seq` cursors) so a +// single huge turn never dumps in full, then grouped into render messages. +// `before` is the oldest chunk seq the client already holds. This is what +// powers per-chunk frontend pagination / memory control. tabsRoutes.get("/:id/messages", (c) => { const id = c.req.param("id"); const limitRaw = c.req.query("limit"); @@ -78,9 +84,13 @@ tabsRoutes.get("/:id/messages", (c) => { ...(before !== undefined && Number.isFinite(before) ? { before } : {}), } : undefined; - const messages = getMessagesForTab(id, options); - const total = getTotalMessageCount(id); - return c.json({ messages, total }); + const chunks = getChunksForTab(id, options); + const messages = groupRowsToMessages(chunks); + // `oldestSeq` is the chunk-seq cursor the client pages backward from; null + // when the window is empty. + const oldestSeq = chunks.length > 0 ? (chunks[0]?.seq ?? null) : null; + const total = getTotalChunkCount(id); + return c.json({ messages, total, oldestSeq }); }); tabsRoutes.patch("/:id", async (c) => { |
