diff options
| author | Adam Malczewski <[email protected]> | 2026-05-30 20:06:31 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-05-30 20:06:31 +0900 |
| commit | 0f39b6f78957aacf206012ad2193d9b0c1940c1f (patch) | |
| tree | ff5f2da8b4f3cdf56cf50d44b8fec75a489ad6fe | |
| parent | 8c58a973b0d021689cebad5c0cc6d56956bbc2f6 (diff) | |
| download | dispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.tar.gz dispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.zip | |
refactor(chunks): append-only chunk log with per-step cache-stable wire
Replace the message-as-container model with a flat, append-only chunk log.
- chunks table (id, tab_id, seq, turn_id, step, role, type, data_json): one
row per chunk; tool_call (assistant) and tool_result (tool) are SEPARATE
rows linked by callId. Message/turn are derived groupings, not stored.
- chunks/transform.ts: DB-free explode (Chunk[] -> rows) / group (rows ->
messages), shared by backend and the browser frontend.
- Cache fix: toModelMessages segments each turn at tool-batch boundaries into
stable [assistant, tool] pairs per step, so earlier steps serialize
byte-identically across requests (kills the prompt-cache churn).
- agent-manager persists a turn's chunks on seal (once), discarding a failed
fallback attempt's partial chunks; rebuilds agent history from the log.
- GET /messages windows the log by chunk seq then groups; loadMoreMessages
merges a turn split across the window boundary by turnId.
- One-shot migration drops the legacy messages table and clears tabs;
settings/credentials/keys/usage preserved.
Full suite green (317 tests); biome, tsc, and svelte-check clean.
| -rw-r--r-- | packages/api/src/agent-manager.ts | 241 | ||||
| -rw-r--r-- | packages/api/src/routes/tabs.ts | 20 | ||||
| -rw-r--r-- | packages/api/tests/agent-manager.test.ts | 11 | ||||
| -rw-r--r-- | packages/api/tests/routes.test.ts | 20 | ||||
| -rw-r--r-- | packages/core/src/agent/agent.ts | bin | 54066 -> 54519 bytes | |||
| -rw-r--r-- | packages/core/src/chunks/transform.ts | 266 | ||||
| -rw-r--r-- | packages/core/src/db/chunks.ts | 150 | ||||
| -rw-r--r-- | packages/core/src/db/index.ts | 46 | ||||
| -rw-r--r-- | packages/core/src/db/messages.ts | 154 | ||||
| -rw-r--r-- | packages/core/src/index.ts | 17 | ||||
| -rw-r--r-- | packages/core/src/types/index.ts | 79 | ||||
| -rw-r--r-- | packages/core/tests/agent/agent.test.ts | 141 | ||||
| -rw-r--r-- | packages/core/tests/db/chunks.test.ts | 179 | ||||
| -rw-r--r-- | packages/frontend/src/lib/tabs.svelte.ts | 50 | ||||
| -rw-r--r-- | packages/frontend/src/lib/types.ts | 7 |
15 files changed, 1024 insertions, 357 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index c873388..f7975d1 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -3,9 +3,8 @@ import { type AgentEvent, type AgentSkillMapping, type AgentStatus, + appendChunks, appendEventToChunks, - appendMessage, - applySystemEvent, BackgroundShellStore, BackgroundTranscriptStore, type Chunk, @@ -26,6 +25,8 @@ import { createYoutubeTranscribeTool, type DispatchConfig, expandAgentToolNames, + explodeTurn, + explodeUserText, GLOBAL_AGENTS_DIR, getAgentDirPaths, getClaudeAccountsFromDB, @@ -45,7 +46,6 @@ import { type TabStatusSnapshot, TaskList, toAvailableAgents, - updateMessage, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -179,13 +179,23 @@ interface TabAgent { /** Store for transcript requests backgrounded due to user interrupt. */ transcriptStore: BackgroundTranscriptStore; /** - * In-flight assistant message chunks for the active turn. `null` when - * no turn is running. Out-of-band system events (config-reload, - * cancel, etc.) target this list when present. + * In-flight assistant chunks for the active turn. `null` when no turn is + * running. Out-of-band system events (config-reload, cancel, etc.) push + * onto this list when present; it is exploded into chunk rows when the + * turn flushes. */ currentChunks: Chunk[] | null; - /** DB id of the in-flight assistant message (if persisted yet). */ + /** + * Opaque id of the in-flight assistant turn, used as the `currentAssistantId` + * in the WS status snapshot so a reconnecting frontend can align its local + * streaming message. (No longer a DB row id — the turn is many chunk rows.) + */ currentAssistantId: string | null; + /** + * `turn_id` shared by the current turn's user message and assistant chunk + * rows. Set at the start of `processMessage`, cleared when the turn ends. + */ + currentTurnId: string | null; } export class AgentManager { @@ -331,6 +341,7 @@ export class AgentManager { transcriptStore: new BackgroundTranscriptStore(), currentChunks: null, currentAssistantId: null, + currentTurnId: null, }; this.tabAgents.set(tabId, tabAgent); } @@ -716,17 +727,14 @@ export class AgentManager { // 3. Config or skills reload (configWatcher / skillsWatcher // also null out `tabAgent.agent`). // - // Boundary semantics: `processMessage` calls `appendMessage` - // for the current turn's user message BEFORE calling this - // function, so the DB ends in `[..., u_current]`. In the - // fallback retry path (agent-mode automatic model fallback), - // the previous attempt may also have flushed a partial - // assistant response, so the DB ends in - // `[..., u_current, partial_a]`. Either way, we walk - // backwards to the most recent user-role row and load only - // strictly-prior rows: `agent.run()` will push the current - // user message itself at agent.ts:546, so including it here - // would duplicate it. + // Boundary semantics: `processMessage` appends the current turn's + // user message (as a chunk row) BEFORE calling this function, so the + // grouped history ends in `[..., u_current]`. In the fallback retry + // path the previous attempt may also have flushed a partial assistant + // turn, so it can end `[..., u_current, partial_a]`. Either way, we + // walk backwards to the most recent user-role message and load only + // strictly-prior messages: `agent.run()` pushes the current user + // message itself, so including it here would duplicate it. // // `toModelMessages` already filters out `role === "system"` // rows and strips `error` / `system` chunks, so it's safe to @@ -824,54 +832,31 @@ export class AgentManager { } /** - * Persist a system chunk to a tab's message history. + * Persist a system chunk (notice / model-changed / config-reload / + * cancelled) to a tab's history. * * If an assistant turn is in flight (`currentChunks` is non-null), the - * chunk is appended to the in-flight assistant message's chunk list — - * the final `appendMessage` / `updateMessage` call at end-of-turn picks - * it up automatically. + * chunk is folded into the in-flight chunk list; it is exploded into a + * `system` chunk row when the turn flushes. * - * Otherwise we load the tab's persisted messages, run them through - * `applySystemEvent` (which either appends to an existing trailing - * `role: "system"` message or creates a new one), then persist the - * delta via `appendMessage` / `updateMessage`. + * Otherwise we append a standalone `system` chunk row immediately. Adjacent + * system rows are coalesced back into one system message at group time + * (`groupRowsToMessages`). */ private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void { const tabAgent = this.tabAgents.get(tabId); - // Turn in flight → append directly to the in-flight chunk list. - // The chunk lands on the assistant message when it's persisted at - // turn-end (or the assistant message is updated mid-turn elsewhere). + // Turn in flight → fold into the in-flight chunk list; it is exploded + // into chunk rows (including this system chunk) when the turn flushes. if (tabAgent?.currentChunks) { tabAgent.currentChunks.push({ type: "system", kind, text }); - if (tabAgent.currentAssistantId) { - try { - updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); - } catch { - // Best-effort — the final persistence in processMessage will - // flush the same chunks again. - } - } return; } - // No turn in flight → route via applySystemEvent against the - // persisted message list. Either appends to a trailing system - // message or creates a fresh one. + // No turn in flight → persist a standalone system chunk row immediately. try { - const rows = getMessagesForTab(tabId); - const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks })); - const before = messages[messages.length - 1]; - const { messageId } = applySystemEvent(messages, { kind, text }); - const target = messages.find((m) => m.id === messageId); - if (!target) return; - if (before && before.id === messageId) { - // Appended to existing trailing system message. - updateMessage(messageId, JSON.stringify(target.chunks)); - } else { - // Newly created system message. - appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks)); - } + const turnId = tabAgent?.currentTurnId ?? crypto.randomUUID(); + appendChunks(tabId, explodeTurn(turnId, [{ type: "system", kind, text }])); } catch { // DB not available (e.g. tab not yet created) — drop silently. } @@ -880,22 +865,17 @@ export class AgentManager { stopTab(tabId: string): void { const tabAgent = this.tabAgents.get(tabId); if (tabAgent) { - // If a turn is in flight, drop a `cancelled` system chunk into - // the in-flight assistant message so the user sees an explicit - // "Generation cancelled by user" marker at the cancellation point. + // If a turn is in flight, drop a `cancelled` system chunk into the + // in-flight chunk list so the user sees an explicit "Generation + // cancelled by user" marker at the cancellation point. It is + // persisted (as a chunk row) when `processMessage` flushes the + // aborted turn. if (tabAgent.currentChunks) { tabAgent.currentChunks.push({ type: "system", kind: "cancelled", text: "Generation cancelled by user", }); - if (tabAgent.currentAssistantId) { - try { - updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); - } catch { - // best-effort - } - } } tabAgent.abortController?.abort(); tabAgent.status = "idle"; @@ -1134,13 +1114,12 @@ export class AgentManager { tabAgent.status = "running"; this.messageCount += 1; - // Persist user message to DB (once, before any fallback retry) - appendMessage( - tabId, - crypto.randomUUID(), - "user", - JSON.stringify([{ type: "text", text: message }]), - ); + // Persist the user message as a chunk row (once, before any fallback + // retry). The whole turn — this user message plus the assistant's + // chunk rows — shares one `turn_id`. + const turnId = crypto.randomUUID(); + tabAgent.currentTurnId = turnId; + appendChunks(tabId, explodeUserText(turnId, message)); // Store agent models on the tab if provided (defines fallback order) if (agentModels) { @@ -1166,25 +1145,23 @@ export class AgentManager { currentModelId = entry.model_id || undefined; allOutput = ""; - // Single ordered chunk list for the assistant turn — replaces the - // previous (text + toolCalls + thinking) tri-accumulator pattern. - // Persisted progressively (insert on first chunk, update thereafter) - // so out-of-band routes (config-reload, cancel) see real DB rows. + // Single ordered chunk list accumulating this attempt's assistant + // turn (text / thinking / tool-batch / error / system), folded from + // the stream via the shared `appendEventToChunks` helper. const chunks: Chunk[] = []; const assistantId = crypto.randomUUID(); let assistantPersisted = false; tabAgent.currentChunks = chunks; tabAgent.currentAssistantId = assistantId; + // Write-on-seal: explode the accumulated turn into flat chunk rows + // ONCE, when the turn settles. `explodeTurn` splits each step's + // `tool-batch` into separate `tool_call` + `tool_result` rows and + // tags every row with `turn_id` + derived `step`. const flushAssistant = (): void => { - if (chunks.length === 0) return; - const json = JSON.stringify(chunks); - if (!assistantPersisted) { - appendMessage(tabId, assistantId, "assistant", json); - assistantPersisted = true; - } else { - updateMessage(assistantId, json); - } + if (assistantPersisted || chunks.length === 0) return; + appendChunks(tabId, explodeTurn(turnId, chunks)); + assistantPersisted = true; }; let attemptError: string | null = null; @@ -1205,7 +1182,7 @@ export class AgentManager { }); } } catch { - // Best-effort — if this fails, appendMessage will throw and we'll catch it below + // Best-effort — if this fails, chunk persistence will throw and we'll catch it below } for await (const event of agent.run(message, { @@ -1237,23 +1214,12 @@ export class AgentManager { allOutput += event.delta; } - if (event.type === "done") { - // End of turn — flush the accumulated chunks. Reset the - // in-flight pointers so out-of-band system events route - // through `applySystemEvent` against the persisted list - // instead of mutating a stale array. - flushAssistant(); - chunks.length = 0; - assistantPersisted = true; // suppress post-loop flush - tabAgent.currentChunks = null; - tabAgent.currentAssistantId = null; - continue; - } - // Route every content-bearing event through the shared helper. // `appendEventToChunks` ignores lifecycle events (status / done // / task-list-update / tab-created / message-* / etc), so it's - // safe to call unconditionally. + // safe to call unconditionally. Persistence happens once, after + // the loop, so we never write a partial turn that a fallback + // retry would then duplicate. appendEventToChunks(chunks, event); } } catch (err) { @@ -1261,9 +1227,24 @@ export class AgentManager { attemptError = err instanceof Error ? err.message : String(err); } - // Flush any accumulated assistant content from this attempt (covers - // the abort/error/exception paths where we never saw a `done`). - flushAssistant(); + // Decide whether a fallback retry will supersede this attempt. + const isRetryable = + attemptError !== null && + (attemptError.includes("status=429") || + attemptError.toLowerCase().includes("rate limit") || + attemptError.toLowerCase().includes("rate_limit") || + attemptError.toLowerCase().includes("usage limit") || + attemptError.toLowerCase().includes("exhausted")); + const nextEntry = fallbackSequence[fallbackIdx + 1]; + const willRetry = Boolean(isRetryable && this.modelRegistry && tabAgent.keyId && nextEntry); + + // Persist this attempt's turn — unless a retry will replace it, in + // which case the partial (and its error chunk) is discarded so the + // next attempt's chunks don't merge with a failed one. On success, + // abort, or a final error, the turn is flushed exactly once. + if (!willRetry) { + flushAssistant(); + } tabAgent.currentChunks = null; tabAgent.currentAssistantId = null; @@ -1273,43 +1254,27 @@ export class AgentManager { break; } - // Check if error is retryable (rate limit / exhausted key) - const isRetryable = - attemptError.includes("status=429") || - attemptError.toLowerCase().includes("rate limit") || - attemptError.toLowerCase().includes("rate_limit") || - attemptError.toLowerCase().includes("usage limit") || - attemptError.toLowerCase().includes("exhausted"); - - if (isRetryable && this.modelRegistry && tabAgent.keyId) { - this.modelRegistry.markKeyExhausted(tabAgent.keyId, attemptError); - - // Try the next entry in the agent's fallback sequence - const nextIdx = fallbackIdx + 1; - const nextEntry = fallbackSequence[nextIdx]; - if (nextIdx < maxFallbackAttempts && nextEntry) { - const fallbackMsg = - `Key "${tabAgent.keyId}" rate limited. ` + - `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; - console.warn(`[dispatch] ${fallbackMsg}`); - // Persist the notice + model-change as system chunks. We're - // between turns here (just flushed the previous assistant - // message), so the helper routes them into a `role: "system"` - // message via `applySystemEvent`. - this.emit({ type: "notice", message: fallbackMsg }, tabId); - this.routeSystemEventToTab(tabId, "notice", fallbackMsg); - this.emit( - { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, - tabId, - ); - this.routeSystemEventToTab( - tabId, - "model-changed", - `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, - ); - tabAgent.agent = null; - continue; - } + if (willRetry && nextEntry && tabAgent.keyId) { + this.modelRegistry?.markKeyExhausted(tabAgent.keyId, attemptError); + const fallbackMsg = + `Key "${tabAgent.keyId}" rate limited. ` + + `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; + console.warn(`[dispatch] ${fallbackMsg}`); + // Persist the notice + model-change as standalone system chunk + // rows (no turn in flight now — currentChunks was just cleared). + this.emit({ type: "notice", message: fallbackMsg }, tabId); + this.routeSystemEventToTab(tabId, "notice", fallbackMsg); + this.emit( + { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, + tabId, + ); + this.routeSystemEventToTab( + tabId, + "model-changed", + `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, + ); + tabAgent.agent = null; + continue; } // All fallbacks exhausted or non-retryable error @@ -1319,6 +1284,8 @@ export class AgentManager { this.emit({ type: "status", status: "error" }, tabId); break; } + // Turn fully settled — clear the shared turn id. + tabAgent.currentTurnId = null; // Resolve completion promise for child agents if (processError === null) { diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts index afa5735..e9265ec 100644 --- a/packages/api/src/routes/tabs.ts +++ b/packages/api/src/routes/tabs.ts @@ -2,10 +2,11 @@ import { archiveTab, createTab, deleteSetting, - getMessagesForTab, + getChunksForTab, getSetting, getTab, - getTotalMessageCount, + getTotalChunkCount, + groupRowsToMessages, listOpenTabs, setSetting, updateTabModel, @@ -65,6 +66,11 @@ tabsRoutes.get("/:id", (c) => { return c.json(tab); }); +// Conversation history for a tab, paginated at CHUNK granularity. The flat +// chunk log is windowed by `limit`/`before` (both chunk-`seq` cursors) so a +// single huge turn never dumps in full, then grouped into render messages. +// `before` is the oldest chunk seq the client already holds. This is what +// powers per-chunk frontend pagination / memory control. tabsRoutes.get("/:id/messages", (c) => { const id = c.req.param("id"); const limitRaw = c.req.query("limit"); @@ -78,9 +84,13 @@ tabsRoutes.get("/:id/messages", (c) => { ...(before !== undefined && Number.isFinite(before) ? { before } : {}), } : undefined; - const messages = getMessagesForTab(id, options); - const total = getTotalMessageCount(id); - return c.json({ messages, total }); + const chunks = getChunksForTab(id, options); + const messages = groupRowsToMessages(chunks); + // `oldestSeq` is the chunk-seq cursor the client pages backward from; null + // when the window is empty. + const oldestSeq = chunks.length > 0 ? (chunks[0]?.seq ?? null) : null; + const total = getTotalChunkCount(id); + return c.json({ messages, total, oldestSeq }); }); tabsRoutes.patch("/:id", async (c) => { diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index ba14cad..6b016db 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -259,8 +259,15 @@ vi.mock("@dispatch/core", () => ({ getSetting(_key: string) { return null; }, - appendMessage() {}, - updateMessage() {}, + appendChunks() { + return []; + }, + explodeUserText() { + return []; + }, + explodeTurn() { + return []; + }, getMessagesForTab(tabId: string) { return fakeMessagesByTab.get(tabId) ?? []; }, diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index eba2226..f4de845 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -181,11 +181,27 @@ vi.mock("@dispatch/core", () => ({ getSetting(_key: string) { return null; }, - appendMessage() {}, - updateMessage() {}, + appendChunks() { + return []; + }, + explodeUserText() { + return []; + }, + explodeTurn() { + return []; + }, getMessagesForTab() { return []; }, + getChunksForTab() { + return []; + }, + groupRowsToMessages() { + return []; + }, + getTotalChunkCount() { + return 0; + }, appendEventToChunks(_chunks: unknown[], _event: unknown) { // no-op stub }, diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts Binary files differindex 439b436..4638301 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts diff --git a/packages/core/src/chunks/transform.ts b/packages/core/src/chunks/transform.ts new file mode 100644 index 0000000..a4c6fc8 --- /dev/null +++ b/packages/core/src/chunks/transform.ts @@ -0,0 +1,266 @@ +// Pure, dependency-free transforms between the render-shaped `Chunk[]` / +// `ChatMessage` model and the flat append-only `ChunkRow` log. Kept free of any +// DB (`bun:sqlite`) import so BOTH the backend persistence layer +// (`db/chunks.ts`) and the browser frontend store can share the exact same +// explode/group logic. + +import type { + Chunk, + ChunkRow, + ChunkRowDraft, + ErrorData, + MessageRole, + SystemData, + TextData, + ThinkingData, + ToolBatchChunk, + ToolCallData, + ToolResultData, +} from "../types/index.js"; + +/** + * A DERIVED message — a grouping of contiguous chunk rows reconstructed for the + * agent's in-memory history and for the frontend's render bubbles. NOT a stored + * shape: the source of truth is the flat chunk log. + */ +export interface MessageRow { + id: string; + tabId: string; + seq: number; + /** turn_id of the chunk rows this message was grouped from. */ + turnId: string; + role: MessageRole; + chunks: Chunk[]; + createdAt: number; +} + +// ─── Explode: in-memory turn → flat chunk-row drafts ───────────── +// +// A turn's render-shaped `Chunk[]` is flattened into append-only rows. +// `tool-batch` chunks are split into SEPARATE `tool_call` (role=assistant) and +// `tool_result` (role=tool) rows linked by `callId`, mapping 1:1 to the +// Anthropic wire format. `step` increments after each tool-batch: every LLM +// round-trip emits text/thinking then (optionally) one tool-batch, so a +// tool-batch boundary is exactly a step boundary. + +/** Explode a single user message's text into one row draft. */ +export function explodeUserText(turnId: string, text: string): ChunkRowDraft[] { + return [{ turnId, step: 0, role: "user", type: "text", data: { text } }]; +} + +/** Explode an assistant turn's accumulated chunks into ordered row drafts. */ +export function explodeTurn(turnId: string, chunks: Chunk[]): ChunkRowDraft[] { + const drafts: ChunkRowDraft[] = []; + let step = 0; + for (const chunk of chunks) { + switch (chunk.type) { + case "text": + drafts.push({ turnId, step, role: "assistant", type: "text", data: { text: chunk.text } }); + break; + case "thinking": + drafts.push({ + turnId, + step, + role: "assistant", + type: "thinking", + data: { + text: chunk.text, + ...(chunk.metadata !== undefined ? { metadata: chunk.metadata } : {}), + }, + }); + break; + case "tool-batch": { + for (const call of chunk.calls) { + drafts.push({ + turnId, + step, + role: "assistant", + type: "tool_call", + data: { callId: call.id, name: call.name, arguments: call.arguments }, + }); + } + for (const call of chunk.calls) { + if (call.result === undefined) continue; + drafts.push({ + turnId, + step, + role: "tool", + type: "tool_result", + data: { + callId: call.id, + name: call.name, + result: call.result, + isError: call.isError ?? false, + ...(call.shellOutput !== undefined ? { shellOutput: call.shellOutput } : {}), + }, + }); + } + // A tool-batch ends the current LLM step; subsequent chunks belong + // to the next round-trip. + step++; + break; + } + case "error": + drafts.push({ + turnId, + step, + role: "assistant", + type: "error", + data: { + message: chunk.message, + ...(chunk.statusCode !== undefined ? { statusCode: chunk.statusCode } : {}), + }, + }); + break; + case "system": + drafts.push({ + turnId, + step, + role: "system", + type: "system", + data: { kind: chunk.kind, text: chunk.text }, + }); + break; + } + } + return drafts; +} + +// ─── Group: flat chunk rows → derived render messages ──────────── +// +// The inverse of explode (best-effort over an arbitrary window, so it tolerates +// orphan tool-results whose tool-call was paged out). `tool_result` rows +// (role=tool) merge back into the preceding assistant message's per-step +// `tool-batch` chunk by `callId` rather than forming their own message. + +export function groupRowsToMessages(rows: ChunkRow[]): MessageRow[] { + const messages: MessageRow[] = []; + + let current: { msg: MessageRow; batches: Map<number, ToolBatchChunk> } | null = null; + const flush = () => { + if (current) { + messages.push(current.msg); + current = null; + } + }; + const ensureAssistant = (row: ChunkRow) => { + if (!current) { + current = { + msg: { + id: row.id, + tabId: row.tabId, + seq: row.seq, + turnId: row.turnId, + role: "assistant", + chunks: [], + createdAt: row.createdAt, + }, + batches: new Map(), + }; + } + return current; + }; + const ensureBatch = (step: number): ToolBatchChunk => { + const c = current; + if (!c) throw new Error("ensureBatch called without an assistant message"); + let batch = c.batches.get(step); + if (!batch) { + batch = { type: "tool-batch", calls: [] }; + c.batches.set(step, batch); + c.msg.chunks.push(batch); + } + return batch; + }; + + for (const row of rows) { + if (row.role === "user") { + flush(); + const d = row.data as TextData; + messages.push({ + id: row.id, + tabId: row.tabId, + seq: row.seq, + turnId: row.turnId, + role: "user", + chunks: [{ type: "text", text: d.text }], + createdAt: row.createdAt, + }); + continue; + } + if (row.role === "system") { + // Coalesce consecutive system rows into one system message (multiple + // system chunks), matching the old applySystemEvent behaviour. + const prev = messages[messages.length - 1]; + const d = row.data as SystemData; + if (current === null && prev && prev.role === "system") { + prev.chunks.push({ type: "system", kind: d.kind, text: d.text }); + continue; + } + flush(); + messages.push({ + id: row.id, + tabId: row.tabId, + seq: row.seq, + turnId: row.turnId, + role: "system", + chunks: [{ type: "system", kind: d.kind, text: d.text }], + createdAt: row.createdAt, + }); + continue; + } + + // assistant / tool rows → part of the current assistant message + const c = ensureAssistant(row); + switch (row.type) { + case "text": + c.msg.chunks.push({ type: "text", text: (row.data as TextData).text }); + break; + case "thinking": { + const d = row.data as ThinkingData; + c.msg.chunks.push({ + type: "thinking", + text: d.text, + ...(d.metadata !== undefined ? { metadata: d.metadata } : {}), + }); + break; + } + case "error": { + const d = row.data as ErrorData; + c.msg.chunks.push({ + type: "error", + message: d.message, + ...(d.statusCode !== undefined ? { statusCode: d.statusCode } : {}), + }); + break; + } + case "tool_call": { + const d = row.data as ToolCallData; + ensureBatch(row.step).calls.push({ id: d.callId, name: d.name, arguments: d.arguments }); + break; + } + case "tool_result": { + const d = row.data as ToolResultData; + const batch = ensureBatch(row.step); + const entry = batch.calls.find((e) => e.id === d.callId); + if (entry) { + entry.result = d.result; + entry.isError = d.isError; + if (d.shellOutput !== undefined) entry.shellOutput = d.shellOutput; + } else { + // Orphan result (its tool_call was paged out of this window). + batch.calls.push({ + id: d.callId, + name: d.name, + arguments: {}, + result: d.result, + isError: d.isError, + ...(d.shellOutput !== undefined ? { shellOutput: d.shellOutput } : {}), + }); + } + break; + } + } + } + flush(); + return messages; +} diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts new file mode 100644 index 0000000..6841eb5 --- /dev/null +++ b/packages/core/src/db/chunks.ts @@ -0,0 +1,150 @@ +import { randomUUID } from "node:crypto"; +import { + explodeTurn, + explodeUserText, + groupRowsToMessages, + type MessageRow, +} from "../chunks/transform.js"; +import type { ChunkData, ChunkRow, ChunkRowDraft, TextData } from "../types/index.js"; +import { getDatabase } from "./index.js"; + +// Re-export the DB-free transforms so existing barrel consumers +// (`@dispatch/core`) keep importing them from here. The browser frontend deep- +// imports them directly from `chunks/transform.js` to avoid the DB dependency. +export { explodeTurn, explodeUserText, groupRowsToMessages, type MessageRow }; + +// ─── Persistence ───────────────────────────────────────────────── + +function mapRow(row: Record<string, unknown>): ChunkRow { + let data: ChunkData; + try { + data = JSON.parse(row.data_json as string) as ChunkData; + } catch { + data = { text: "" } as TextData; + } + return { + id: row.id as string, + tabId: row.tab_id as string, + seq: row.seq as number, + turnId: row.turn_id as string, + step: row.step as number, + role: row.role as ChunkRow["role"], + type: row.type as ChunkRow["type"], + data, + createdAt: row.created_at as number, + }; +} + +/** + * Append one or more chunk-row drafts to a tab, assigning a monotonic per-tab + * `seq` and a fresh id/timestamp to each. Returns the inserted rows in order. + */ +export function appendChunks(tabId: string, drafts: ChunkRowDraft[]): ChunkRow[] { + if (drafts.length === 0) return []; + const db = getDatabase(); + const maxSeq = db + .query("SELECT COALESCE(MAX(seq), -1) as max_seq FROM chunks WHERE tab_id = $tabId") + .get({ $tabId: tabId }) as { max_seq: number }; + let seq = (maxSeq?.max_seq ?? -1) + 1; + const now = Date.now(); + const insert = db.query( + `INSERT INTO chunks (id, tab_id, seq, turn_id, step, role, type, data_json, created_at) + VALUES ($id, $tabId, $seq, $turnId, $step, $role, $type, $dataJson, $now)`, + ); + const out: ChunkRow[] = []; + for (const draft of drafts) { + const id = randomUUID(); + insert.run({ + $id: id, + $tabId: tabId, + $seq: seq, + $turnId: draft.turnId, + $step: draft.step, + $role: draft.role, + $type: draft.type, + $dataJson: JSON.stringify(draft.data), + $now: now, + }); + out.push({ + id, + tabId, + seq, + turnId: draft.turnId, + step: draft.step, + role: draft.role, + type: draft.type, + data: draft.data, + createdAt: now, + }); + seq++; + } + return out; +} + +/** + * Read chunk rows for a tab in `seq` order (ASC). Pagination mirrors the old + * message pagination but at chunk granularity: + * - no options → all rows; + * - `before` → rows with `seq < before`, most-recent-first then reversed; + * - `limit` → most recent `limit` rows, reversed to ASC. + */ +export function getChunksForTab( + tabId: string, + options?: { limit?: number; before?: number }, +): ChunkRow[] { + const db = getDatabase(); + if (!options) { + const rows = db + .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC") + .all({ $tabId: tabId }) as Array<Record<string, unknown>>; + return rows.map(mapRow); + } + const { limit, before } = options; + if (before !== undefined) { + if (limit !== undefined) { + const rows = db + .query( + "SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC LIMIT $limit", + ) + .all({ $tabId: tabId, $before: before, $limit: limit }) as Array<Record<string, unknown>>; + return rows.map(mapRow).reverse(); + } + const rows = db + .query("SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC") + .all({ $tabId: tabId, $before: before }) as Array<Record<string, unknown>>; + return rows.map(mapRow).reverse(); + } + if (limit !== undefined) { + const rows = db + .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq DESC LIMIT $limit") + .all({ $tabId: tabId, $limit: limit }) as Array<Record<string, unknown>>; + return rows.map(mapRow).reverse(); + } + const rows = db + .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC") + .all({ $tabId: tabId }) as Array<Record<string, unknown>>; + return rows.map(mapRow); +} + +/** + * Derived, grouped view of a tab's full history as messages. Used to + * pre-populate the agent's in-memory `ChatMessage[]` history when an Agent is + * (re)constructed. Always reads the full log (grouping a partial window would + * be lossy for the rebuild path). + */ +export function getMessagesForTab(tabId: string): MessageRow[] { + return groupRowsToMessages(getChunksForTab(tabId)); +} + +export function getTotalChunkCount(tabId: string): number { + const db = getDatabase(); + const row = db + .query("SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId") + .get({ $tabId: tabId }) as { count: number } | null; + return row?.count ?? 0; +} + +export function clearChunksForTab(tabId: string): void { + const db = getDatabase(); + db.query("DELETE FROM chunks WHERE tab_id = $tabId").run({ $tabId: tabId }); +} diff --git a/packages/core/src/db/index.ts b/packages/core/src/db/index.ts index e63b266..18dd1b5 100644 --- a/packages/core/src/db/index.ts +++ b/packages/core/src/db/index.ts @@ -93,16 +93,46 @@ export function getDatabase(): Database { // Column already exists — ignore } - _db.run(`CREATE TABLE IF NOT EXISTS messages ( - id TEXT PRIMARY KEY, - tab_id TEXT NOT NULL REFERENCES tabs(id), - seq INTEGER NOT NULL, - role TEXT NOT NULL, - content_json TEXT NOT NULL, - created_at INTEGER NOT NULL + // ─── Append-only chunk log (replaces the old `messages` blob table) ── + // + // A conversation is stored as a flat, append-only stream of chunk rows + // keyed by a per-tab monotonic `seq`. "Message" and "turn" are DERIVED + // groupings (see db/chunks.ts), never stored containers. This is what + // powers per-chunk frontend pagination AND the stable per-step wire + // format that fixes Anthropic prompt-cache churn (see plan-chunk-log.md). + // + // role : 'user' | 'assistant' | 'tool' | 'system' + // type : 'text' | 'thinking' | 'tool_call' | 'tool_result' | 'error' | 'system' + // step : LLM round-trip index within a turn (user/system rows = 0) + // data_json: the type-specific payload (see ChunkData in types) + _db.run(`CREATE TABLE IF NOT EXISTS chunks ( + id TEXT PRIMARY KEY, + tab_id TEXT NOT NULL, + seq INTEGER NOT NULL, + turn_id TEXT NOT NULL, + step INTEGER NOT NULL DEFAULT 0, + role TEXT NOT NULL, + type TEXT NOT NULL, + data_json TEXT NOT NULL, + created_at INTEGER NOT NULL )`); - _db.run(`CREATE INDEX IF NOT EXISTS idx_messages_tab ON messages(tab_id, seq)`); + _db.run(`CREATE INDEX IF NOT EXISTS idx_chunks_tab_seq ON chunks(tab_id, seq)`); + + // One-shot migration off the legacy `messages` blob model. Beta software, + // no backward compatibility: the old chat history is destroyed (tabs + + // messages), while settings / credentials / api_keys / usage_cache / + // wake_schedule are preserved. Detect the old schema by the presence of + // the `messages` table; once dropped, this branch never runs again. + const hasLegacyMessages = _db + .query("SELECT name FROM sqlite_master WHERE type='table' AND name='messages'") + .get() as { name: string } | null; + if (hasLegacyMessages) { + _db.run("DROP TABLE IF EXISTS messages"); + // Clear conversation containers too (fresh slate for the new model). + _db.run("DELETE FROM tabs"); + _db.run("DELETE FROM chunks"); + } _db.run(`CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, diff --git a/packages/core/src/db/messages.ts b/packages/core/src/db/messages.ts deleted file mode 100644 index 7fc6ccf..0000000 --- a/packages/core/src/db/messages.ts +++ /dev/null @@ -1,154 +0,0 @@ -import type { Chunk, MessageRole } from "../types/index.js"; -import { getDatabase } from "./index.js"; - -/** - * A persisted message row, with `content_json` already parsed into a `Chunk[]`. - * Mirrors the new schema (no `thinking` column — that lived under the old - * `content + toolCalls + toolResults + thinking` model). - */ -export interface MessageRow { - id: string; - tabId: string; - seq: number; - role: MessageRole; - chunks: Chunk[]; - createdAt: number; -} - -/** - * Append a new message to the tab. Caller passes the already-serialized - * chunk list as `contentJson` (i.e. `JSON.stringify(chunks)`). - */ -export function appendMessage( - tabId: string, - id: string, - role: MessageRole, - contentJson: string, -): void { - const db = getDatabase(); - const maxSeq = db - .query("SELECT COALESCE(MAX(seq), -1) as max_seq FROM messages WHERE tab_id = $tabId") - .get({ $tabId: tabId }) as { max_seq: number }; - const seq = (maxSeq?.max_seq ?? -1) + 1; - db.query( - `INSERT INTO messages (id, tab_id, seq, role, content_json, created_at) - VALUES ($id, $tabId, $seq, $role, $contentJson, $now)`, - ).run({ - $id: id, - $tabId: tabId, - $seq: seq, - $role: role, - $contentJson: contentJson, - $now: Date.now(), - }); -} - -/** - * Replace the persisted chunks for an existing message. `contentJson` is - * the already-serialized chunk list. - */ -export function updateMessage(id: string, contentJson: string): void { - const db = getDatabase(); - db.query("UPDATE messages SET content_json = $contentJson WHERE id = $id").run({ - $id: id, - $contentJson: contentJson, - }); -} - -/** - * Read messages for a tab in seq order (ASC). `content_json` is parsed into - * `Chunk[]` here so callers don't have to. If a row's JSON is malformed, - * the message is returned with an empty chunk list rather than throwing. - * - * When `options` is omitted, returns ALL messages (backward compatible). - * - * When `options.before` is provided, returns messages with `seq < before`, - * taking the most recent ones first (DESC) up to `options.limit`, then - * reversing back to ASC before returning. - * - * When only `options.limit` is provided, returns the most recent `limit` - * messages, reversed back to ASC. - */ -export function getMessagesForTab( - tabId: string, - options?: { limit?: number; before?: number }, -): MessageRow[] { - const db = getDatabase(); - - const mapRow = (row: Record<string, unknown>): MessageRow => { - const rawJson = row.content_json as string; - let chunks: Chunk[]; - try { - const parsed = JSON.parse(rawJson); - chunks = Array.isArray(parsed) ? (parsed as Chunk[]) : []; - } catch { - chunks = []; - } - return { - id: row.id as string, - tabId: row.tab_id as string, - seq: row.seq as number, - role: row.role as MessageRole, - chunks, - createdAt: row.created_at as number, - }; - }; - - // Backward-compatible path: no options → ALL messages, seq ASC. - if (!options) { - const rows = db - .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq ASC") - .all({ $tabId: tabId }) as Array<Record<string, unknown>>; - return rows.map(mapRow); - } - - const { limit, before } = options; - - // Paginated path: fetch DESC, then reverse to ASC before returning. - if (before !== undefined) { - // `seq < before`, DESC, optionally limited. - if (limit !== undefined) { - const rows = db - .query( - "SELECT * FROM messages WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC LIMIT $limit", - ) - .all({ $tabId: tabId, $before: before, $limit: limit }) as Array<Record<string, unknown>>; - return rows.map(mapRow).reverse(); - } - const rows = db - .query("SELECT * FROM messages WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC") - .all({ $tabId: tabId, $before: before }) as Array<Record<string, unknown>>; - return rows.map(mapRow).reverse(); - } - - // Only `limit` provided: most recent `limit`, reversed to ASC. - if (limit !== undefined) { - const rows = db - .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq DESC LIMIT $limit") - .all({ $tabId: tabId, $limit: limit }) as Array<Record<string, unknown>>; - return rows.map(mapRow).reverse(); - } - - // `options` was provided but empty → same as no options. - const rows = db - .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq ASC") - .all({ $tabId: tabId }) as Array<Record<string, unknown>>; - return rows.map(mapRow); -} - -/** - * Return the total number of persisted messages for a tab. - * Used by the API to advertise total history size alongside a paginated window. - */ -export function getTotalMessageCount(tabId: string): number { - const db = getDatabase(); - const row = db - .query("SELECT COUNT(*) as count FROM messages WHERE tab_id = $tabId") - .get({ $tabId: tabId }) as { count: number } | null; - return row?.count ?? 0; -} - -export function clearMessagesForTab(tabId: string): void { - const db = getDatabase(); - db.query("DELETE FROM messages WHERE tab_id = $tabId").run({ $tabId: tabId }); -} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index e33ad2f..9fe7550 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -29,16 +29,19 @@ export { } from "./config/index.js"; // Credentials export * from "./credentials/index.js"; -// Database -export { closeDatabase, getDatabase, getDatabasePath } from "./db/index.js"; export { - appendMessage, - clearMessagesForTab, + appendChunks, + clearChunksForTab, + explodeTurn, + explodeUserText, + getChunksForTab, getMessagesForTab, - getTotalMessageCount, + getTotalChunkCount, + groupRowsToMessages, type MessageRow, - updateMessage, -} from "./db/messages.js"; +} from "./db/chunks.js"; +// Database +export { closeDatabase, getDatabase, getDatabasePath } from "./db/index.js"; export { deleteSetting, getSetting, setSetting } from "./db/settings.js"; // Tabs & Messages export { diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts index 7e7460b..3fcdb40 100644 --- a/packages/core/src/types/index.ts +++ b/packages/core/src/types/index.ts @@ -78,6 +78,85 @@ export interface ChatMessage { chunks: Chunk[]; } +// ─── Append-only chunk log (persisted model) ───────────────────── +// +// The DB stores a conversation as a flat stream of `ChunkRow`s (see +// db/chunks.ts). The render-facing `Chunk`/`ChatMessage` shapes above are +// DERIVED from these rows by grouping (turn_id + step + role). Tool calls +// and their results are SEPARATE rows linked by `callId`, mapping 1:1 to the +// Anthropic wire format. + +/** Role of a persisted chunk row. `tool` rows hold tool results. */ +export type ChunkRole = "user" | "assistant" | "tool" | "system"; + +/** Discriminator for a persisted chunk row's payload. */ +export type ChunkType = "text" | "thinking" | "tool_call" | "tool_result" | "error" | "system"; + +export interface TextData { + text: string; +} +export interface ThinkingData { + text: string; + metadata?: Record<string, unknown>; +} +export interface ToolCallData { + callId: string; + name: string; + arguments: Record<string, unknown>; +} +export interface ToolResultData { + callId: string; + name: string; + result: string; + isError: boolean; + shellOutput?: { stdout: string; stderr: string }; +} +export interface ErrorData { + message: string; + statusCode?: number; +} +export interface SystemData { + kind: SystemChunkKind; + text: string; +} + +export type ChunkData = + | TextData + | ThinkingData + | ToolCallData + | ToolResultData + | ErrorData + | SystemData; + +/** + * A persisted chunk row — the append-only unit of conversation storage and + * the unit of frontend pagination. `seq` is per-tab monotonic and is both the + * ordering key and the pagination cursor. + */ +export interface ChunkRow { + id: string; + tabId: string; + seq: number; + turnId: string; + step: number; + role: ChunkRole; + type: ChunkType; + data: ChunkData; + createdAt: number; +} + +/** + * A chunk-row draft (no `seq`/`tabId`/`createdAt`/`id` yet) used when + * exploding an in-memory turn into rows for persistence. + */ +export interface ChunkRowDraft { + turnId: string; + step: number; + role: ChunkRole; + type: ChunkType; + data: ChunkData; +} + export interface ToolCall { id: string; name: string; diff --git a/packages/core/tests/agent/agent.test.ts b/packages/core/tests/agent/agent.test.ts index d6daac6..6c2b452 100644 --- a/packages/core/tests/agent/agent.test.ts +++ b/packages/core/tests/agent/agent.test.ts @@ -450,10 +450,11 @@ describe("Agent", () => { expect(toolContent[0]).not.toHaveProperty("result"); }); - it("Anthropic [tool-call, text] split: mixed-order assistant message gets split into [text]+[tool-call]", async () => { - // Pre-seed an assistant message with chunks in [tool-batch, text] order — - // which produces [tool-call, text] in the ModelMessage content, a shape - // Anthropic rejects. Only applies for anthropic / opencode-anthropic provider. + it("per-step segmentation: a [tool-batch, text] turn becomes [assistant(tool-call), tool(result), assistant(text)]", async () => { + // `toModelMessages` segments a turn at each tool-batch boundary, so the + // tool-batch (step 0) and the trailing text (step 1) land in SEPARATE + // assistant messages — never a single invalid [tool_use, text] block. + // This is the cache-stability fix and is applied for every provider. const agent = new Agent(makeConfig({ provider: "opencode-anthropic" })); agent.messages.push({ role: "user", @@ -490,34 +491,31 @@ describe("Agent", () => { const callArgs = vi.mocked(streamText).mock.calls.at(-1)?.[0]; const messages = callArgs?.messages as Array<{ role: string; content: unknown }>; - // After Anthropic structural normalisation, we should have TWO assistant messages: - // 1st: text-only content - // 2nd: tool-call-only content + // No assistant message may mix tool-call and non-tool-call parts (the + // invalid shape Anthropic rejects); segmentation guarantees this. const assistantMsgs = messages.filter((m) => m.role === "assistant"); - expect(assistantMsgs.length).toBeGreaterThanOrEqual(2); - - // Find the text-only assistant message and the tool-call-only assistant message - const textOnlyMsg = assistantMsgs.find((m) => { + for (const m of assistantMsgs) { const c = m.content as Array<Record<string, unknown>>; - return Array.isArray(c) && c.every((p) => p.type !== "tool-call"); - }); - const toolOnlyMsg = assistantMsgs.find((m) => { + if (!Array.isArray(c)) continue; + const hasToolCall = c.some((p) => p.type === "tool-call"); + const hasNonToolCall = c.some((p) => p.type !== "tool-call"); + expect(hasToolCall && hasNonToolCall).toBe(false); + } + + // The seeded turn yields a tool-call assistant message immediately + // followed by its tool-result message (valid tool_use → tool_result). + const toolOnlyIdx = messages.findIndex((m) => { const c = m.content as Array<Record<string, unknown>>; - return Array.isArray(c) && c.every((p) => p.type === "tool-call"); + return m.role === "assistant" && Array.isArray(c) && c.some((p) => p.type === "tool-call"); }); - - // Narrow the optionals — toBeDefined() already verified non-null, - // but TypeScript needs the explicit assertion via local consts so - // we can pass them to indexOf without `!`. - if (!textOnlyMsg || !toolOnlyMsg) throw new Error("type guard"); - - // Text message comes first (before tool-call message) — Anthropic requires this ordering - expect(messages.indexOf(textOnlyMsg)).toBeLessThan(messages.indexOf(toolOnlyMsg)); + expect(toolOnlyIdx).toBeGreaterThanOrEqual(0); + expect(messages[toolOnlyIdx + 1]?.role).toBe("tool"); }); - it("Anthropic [tool-call, text] split: openai-compatible provider preserves original order (no split)", async () => { - // For non-Anthropic providers, the [tool-call, text] split should NOT be applied. - // (No provider set → defaults to openai-compatible) + it("per-step segmentation also applies to the openai-compatible provider", async () => { + // Segmentation is provider-agnostic: a [tool-batch, text] turn is split + // into separate assistant messages for openai-compatible too, with the + // tool result in its own tool message (the standard OpenAI shape). const agent = new Agent(makeConfig()); agent.messages.push({ role: "user", @@ -552,13 +550,24 @@ describe("Agent", () => { const callArgs = vi.mocked(streamText).mock.calls.at(-1)?.[0]; const messages = callArgs?.messages as Array<{ role: string; content: unknown }>; - // For openai-compatible provider, only ONE assistant message with mixed content + // The seeded [tool-batch, text] turn is segmented: a tool-call-only + // assistant message, its tool message, and a separate text assistant + // message (the new turn's "ok" reply adds one more). No assistant + // message mixes tool-call and non-tool-call parts. const assistantMsgs = messages.filter((m) => m.role === "assistant"); - expect(assistantMsgs).toHaveLength(1); - const content = assistantMsgs[0]?.content as Array<Record<string, unknown>>; - // Both tool-call and text parts should be in the same message - expect(content.some((p) => p.type === "tool-call")).toBe(true); - expect(content.some((p) => p.type === "text")).toBe(true); + for (const m of assistantMsgs) { + const c = m.content as Array<Record<string, unknown>>; + if (!Array.isArray(c)) continue; + expect(c.some((p) => p.type === "tool-call") && c.some((p) => p.type !== "tool-call")).toBe( + false, + ); + } + expect(messages.some((m) => m.role === "tool")).toBe(true); + const toolCallMsg = assistantMsgs.find((m) => { + const c = m.content as Array<Record<string, unknown>>; + return Array.isArray(c) && c.some((p) => p.type === "tool-call"); + }); + expect(toolCallMsg).toBeDefined(); }); it("empty-text-part filter (Anthropic): empty text chunk is not sent", async () => { @@ -1250,6 +1259,74 @@ describe("Agent", () => { expect(execCount).toBe(2); }); + // ─── Cache stability: per-step wire prefix is immutable ───────────────────── + + it("keeps earlier steps' wire messages byte-identical across requests (cache prefix is stable)", async () => { + // A 3-step tool turn. The messages for steps 0 and 1 must serialize + // identically in the step-2 request and the step-3 request — that + // byte-stability is what lets Anthropic's rolling prompt cache extend + // instead of re-writing the whole prefix every step (cache-miss-report.md). + // Uses the openai-compatible provider so no cacheControl markers (which + // intentionally move each step) obscure the content comparison. + let n = 0; + // mock.calls accumulates across tests in this file — reset so our + // `calls.length` assertions count only this run's requests. + vi.mocked(streamText).mockClear(); + const toolDef = { + name: "read_file", + description: "reads a file", + parameters: z.object({ path: z.string() }), + execute: async (args: Record<string, unknown>) => `contents of ${String(args.path)}`, + }; + const toolStep = (id: string, path: string) => + makeMockStreamResult([ + { type: "reasoning-delta", id: `r${id}`, text: `thinking ${id}` }, + { type: "text-delta", id: `t${id}`, text: `step ${id}` }, + { type: "tool-call", toolCallId: id, toolName: "read_file", input: { path } }, + finishToolCalls, + ]); + vi.mocked(streamText).mockImplementation(() => { + n++; + if (n === 1) return toolStep("s0", "a.txt"); + if (n === 2) return toolStep("s1", "b.txt"); + if (n === 3) return toolStep("s2", "c.txt"); + return makeMockStreamResult([{ type: "text-delta", id: "tf", text: "done" }, finishStop]); + }); + + const agent = new Agent(makeConfig({ tools: [toolDef] })); + for await (const _ of agent.run("go")) { + /* consume */ + } + + // 4 streamText calls (steps 0..3). Compare the step-2 request (call idx 2) + // and step-3 request (call idx 3). + const calls = vi.mocked(streamText).mock.calls; + expect(calls.length).toBe(4); + const req2 = calls[2]?.[0]?.messages as unknown[]; + const req3 = calls[3]?.[0]?.messages as unknown[]; + + // Step-2 request = [system, user, a(s0), tool(s0), a(s1), tool(s1)] (6). + // Step-3 request appends a(s2), tool(s2). The shared 6-message prefix + // must be byte-identical. + expect(req2).toHaveLength(6); + expect(req3).toHaveLength(8); + expect(JSON.stringify(req3.slice(0, 6))).toBe(JSON.stringify(req2)); + + // And each step really is its own [assistant, tool] pair (not one merged + // assistant message with all tool calls bunched together). + const roles = (req3 as Array<{ role: string }>).map((m) => m.role); + expect(roles).toEqual([ + "system", + "user", + "assistant", + "tool", + "assistant", + "tool", + "assistant", + "tool", + ]); + }); + // ─── Usage / cache-rate telemetry ────────────────────────────────────────── it("emits a usage event from the finish-step part with the cache read/write split", async () => { diff --git a/packages/core/tests/db/chunks.test.ts b/packages/core/tests/db/chunks.test.ts new file mode 100644 index 0000000..fe54628 --- /dev/null +++ b/packages/core/tests/db/chunks.test.ts @@ -0,0 +1,179 @@ +import { describe, expect, it } from "vitest"; +import { explodeTurn, explodeUserText, groupRowsToMessages } from "../../src/chunks/transform.js"; +import type { Chunk, ChunkRow, ChunkRowDraft } from "../../src/types/index.js"; + +// These tests cover the pure explode/group transforms — the heart of the flat +// chunk-log storage model. No DB is required. + +/** Promote drafts to rows with synthetic seq/id/createdAt (as appendChunks would). */ +function toRows(drafts: ChunkRowDraft[], tabId = "tab-1", startSeq = 0): ChunkRow[] { + return drafts.map((d, i) => ({ + id: `c${i}`, + tabId, + seq: startSeq + i, + turnId: d.turnId, + step: d.step, + role: d.role, + type: d.type, + data: d.data, + createdAt: 1000 + i, + })); +} + +describe("explodeTurn", () => { + it("splits a tool-batch into separate tool_call (assistant) and tool_result (tool) rows", () => { + const chunks: Chunk[] = [ + { type: "thinking", text: "hmm", metadata: { anthropic: { signature: "S" } } }, + { type: "text", text: "let me read" }, + { + type: "tool-batch", + calls: [ + { id: "a1", name: "read_file", arguments: { path: "x" }, result: "X", isError: false }, + { id: "a2", name: "read_file", arguments: { path: "y" }, result: "Y", isError: false }, + ], + }, + ]; + const drafts = explodeTurn("turn-1", chunks); + + // thinking, text, tool_call×2 (assistant), tool_result×2 (tool) + expect(drafts.map((d) => `${d.role}/${d.type}`)).toEqual([ + "assistant/thinking", + "assistant/text", + "assistant/tool_call", + "assistant/tool_call", + "tool/tool_result", + "tool/tool_result", + ]); + // All in the same step (one round-trip). + expect(drafts.every((d) => d.step === 0)).toBe(true); + expect(drafts.every((d) => d.turnId === "turn-1")).toBe(true); + }); + + it("increments step after each tool-batch (multi-step turn)", () => { + const chunks: Chunk[] = [ + { type: "text", text: "s0" }, + { type: "tool-batch", calls: [{ id: "a", name: "t", arguments: {}, result: "r" }] }, + { type: "text", text: "s1" }, + { type: "tool-batch", calls: [{ id: "b", name: "t", arguments: {}, result: "r" }] }, + { type: "text", text: "final" }, + ]; + const drafts = explodeTurn("turn-1", chunks); + const byStep = (s: number) => drafts.filter((d) => d.step === s).map((d) => d.type); + expect(byStep(0)).toEqual(["text", "tool_call", "tool_result"]); + expect(byStep(1)).toEqual(["text", "tool_call", "tool_result"]); + expect(byStep(2)).toEqual(["text"]); // trailing final-step text, no tool-batch + }); + + it("omits tool_result rows for calls without a result", () => { + const chunks: Chunk[] = [ + { type: "tool-batch", calls: [{ id: "a", name: "t", arguments: {} }] }, + ]; + const drafts = explodeTurn("turn-1", chunks); + expect(drafts.map((d) => d.type)).toEqual(["tool_call"]); + }); +}); + +describe("groupRowsToMessages (round-trip)", () => { + it("reconstructs a user message then an assistant message with a per-step tool-batch", () => { + const rows = [ + ...toRows(explodeUserText("turn-1", "hello"), "tab-1", 0), + ...toRows( + explodeTurn("turn-1", [ + { type: "text", text: "reading" }, + { + type: "tool-batch", + calls: [ + { + id: "a1", + name: "read_file", + arguments: { path: "x" }, + result: "X", + isError: false, + }, + ], + }, + { type: "text", text: "done" }, + ]), + "tab-1", + 1, + ), + ]; + + const msgs = groupRowsToMessages(rows); + expect(msgs.map((m) => m.role)).toEqual(["user", "assistant"]); + expect(msgs[0]?.chunks).toEqual([{ type: "text", text: "hello" }]); + + const a = msgs[1]; + if (!a) throw new Error("no assistant message"); + // reconstructed: text, tool-batch(step0), text(step1) + expect(a.chunks.map((c) => c.type)).toEqual(["text", "tool-batch", "text"]); + const batch = a.chunks.find((c) => c.type === "tool-batch"); + if (batch?.type !== "tool-batch") throw new Error("no batch"); + expect(batch.calls[0]).toMatchObject({ + id: "a1", + name: "read_file", + arguments: { path: "x" }, + result: "X", + isError: false, + }); + }); + + it("keeps each step's tool calls in its own tool-batch chunk", () => { + const rows = toRows( + explodeTurn("turn-1", [ + { type: "tool-batch", calls: [{ id: "a", name: "t", arguments: {}, result: "ra" }] }, + { type: "tool-batch", calls: [{ id: "b", name: "t", arguments: {}, result: "rb" }] }, + ]), + ); + const msgs = groupRowsToMessages(rows); + expect(msgs).toHaveLength(1); + const batches = msgs[0]?.chunks.filter((c) => c.type === "tool-batch") ?? []; + expect(batches).toHaveLength(2); + }); + + it("round-trips a multi-step assistant turn back to its original chunk shape", () => { + const original: Chunk[] = [ + { type: "thinking", text: "plan", metadata: { anthropic: { signature: "S" } } }, + { type: "text", text: "step0" }, + { + type: "tool-batch", + calls: [ + { id: "a", name: "read_file", arguments: { path: "p" }, result: "R", isError: false }, + ], + }, + { type: "text", text: "final" }, + ]; + const rows = toRows(explodeTurn("turn-1", original)); + const msgs = groupRowsToMessages(rows); + expect(msgs).toHaveLength(1); + expect(msgs[0]?.chunks).toEqual(original); + }); + + it("tolerates an orphan tool_result whose tool_call was paged out", () => { + const rows = toRows([ + { + turnId: "turn-1", + step: 0, + role: "tool", + type: "tool_result", + data: { callId: "z", name: "t", result: "R", isError: false }, + }, + ]); + const msgs = groupRowsToMessages(rows); + expect(msgs).toHaveLength(1); + const batch = msgs[0]?.chunks[0]; + if (batch?.type !== "tool-batch") throw new Error("no batch"); + expect(batch.calls[0]).toMatchObject({ id: "z", result: "R" }); + }); + + it("breaks the assistant grouping on a user or system row", () => { + const rows = [ + ...toRows(explodeUserText("t1", "q1"), "tab", 0), + ...toRows(explodeTurn("t1", [{ type: "text", text: "a1" }]), "tab", 1), + ...toRows(explodeUserText("t2", "q2"), "tab", 2), + ...toRows(explodeTurn("t2", [{ type: "system", kind: "notice", text: "n" }]), "tab", 3), + ]; + const msgs = groupRowsToMessages(rows); + expect(msgs.map((m) => m.role)).toEqual(["user", "assistant", "user", "system"]); + }); +}); diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts index 6e2d157..c57f800 100644 --- a/packages/frontend/src/lib/tabs.svelte.ts +++ b/packages/frontend/src/lib/tabs.svelte.ts @@ -230,12 +230,10 @@ export function createTabStore() { }; const messagesRes = await fetch(`${config.apiBase}/tabs/${agentId}/messages?limit=100`); - // The backend's `getMessagesForTab` (packages/core/src/db/messages.ts) - // already parses `content_json` into a `Chunk[]` and serves it as - // `chunks` over the wire — NOT the raw `contentJson` string. Earlier - // versions of this client expected `contentJson` and silently dropped - // every message when JSON.parse(undefined) threw, leaving the UI - // with empty conversations after a refresh. + // `GET /messages` windows the flat chunk log (last N chunks) and + // groups the rows into render messages (`groupRowsToMessages` in + // packages/core/src/chunks/transform.ts), serving them as `chunks` + // per message over the wire — NOT a raw JSON string. const messagesData = messagesRes.ok ? ((await messagesRes.json()) as { messages: Array<{ @@ -408,8 +406,15 @@ export function createTabStore() { const res = await fetch(`${config.apiBase}/tabs/${tabId}/messages?limit=50${beforeParam}`); if (!res.ok) return; const data = (await res.json()) as { - messages?: Array<{ id?: string; role: string; chunks?: Chunk[]; seq?: number }>; + messages?: Array<{ + id?: string; + role: string; + chunks?: Chunk[]; + seq?: number; + turnId?: string; + }>; total?: number; + oldestSeq?: number | null; }; const rawMessages = data.messages ?? []; if (rawMessages.length === 0) { @@ -426,18 +431,43 @@ export function createTabStore() { chunks: Array.isArray(m.chunks) ? m.chunks : [], isStreaming: false, seq: m.seq, + ...(m.turnId !== undefined ? { turnId: m.turnId } : {}), })); const current = getTabById(tabId); if (!current) return; + // Chunk-granular pagination can split ONE turn across the window + // boundary: the oldest message already loaded and the newest message + // in this older page may share a turn_id. Merge them (older chunks + // first) so the turn renders as one bubble instead of duplicating. + const merged = [...current.messages]; + const lastOlder = older[older.length - 1]; + const firstCurrent = merged[0]; + if ( + lastOlder && + firstCurrent && + lastOlder.turnId !== undefined && + lastOlder.turnId === firstCurrent.turnId && + lastOlder.role === firstCurrent.role + ) { + older.pop(); + merged[0] = { + ...firstCurrent, + id: lastOlder.id, + seq: lastOlder.seq, + turnId: lastOlder.turnId, + chunks: [...lastOlder.chunks, ...firstCurrent.chunks], + }; + } + // Avoid duplicating messages we already have loaded. - const existingIds = new Set(current.messages.map((m) => m.id)); + const existingIds = new Set(merged.map((m) => m.id)); const toPrepend = older.filter((m) => !existingIds.has(m.id)); - const newOldestSeq = oldestSeqOf(rawMessages); + const newOldestSeq = data.oldestSeq ?? oldestSeqOf(rawMessages); updateTab(tabId, { - messages: [...toPrepend, ...current.messages], + messages: [...toPrepend, ...merged], oldestLoadedSeq: newOldestSeq ?? current.oldestLoadedSeq, totalMessages: data.total ?? current.totalMessages, }); diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts index 8c34d69..6e87aec 100644 --- a/packages/frontend/src/lib/types.ts +++ b/packages/frontend/src/lib/types.ts @@ -96,6 +96,13 @@ export interface ChatMessage { isStreaming?: boolean; debugInfo?: DebugInfo; seq?: number; + /** + * turn_id of the chunk rows this message was grouped from (history loaded + * from the backend). Used by `loadMoreMessages` to merge a turn that was + * split across the chunk-pagination window boundary. Absent for live + * (streaming) messages built client-side. + */ + turnId?: string; } export type ConnectionStatus = "connecting" | "connected" | "disconnected"; |
