summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-05-30 20:06:31 +0900
committerAdam Malczewski <[email protected]>2026-05-30 20:06:31 +0900
commit0f39b6f78957aacf206012ad2193d9b0c1940c1f (patch)
treeff5f2da8b4f3cdf56cf50d44b8fec75a489ad6fe
parent8c58a973b0d021689cebad5c0cc6d56956bbc2f6 (diff)
downloaddispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.tar.gz
dispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.zip
refactor(chunks): append-only chunk log with per-step cache-stable wire
Replace the message-as-container model with a flat, append-only chunk log. - chunks table (id, tab_id, seq, turn_id, step, role, type, data_json): one row per chunk; tool_call (assistant) and tool_result (tool) are SEPARATE rows linked by callId. Message/turn are derived groupings, not stored. - chunks/transform.ts: DB-free explode (Chunk[] -> rows) / group (rows -> messages), shared by backend and the browser frontend. - Cache fix: toModelMessages segments each turn at tool-batch boundaries into stable [assistant, tool] pairs per step, so earlier steps serialize byte-identically across requests (kills the prompt-cache churn). - agent-manager persists a turn's chunks on seal (once), discarding a failed fallback attempt's partial chunks; rebuilds agent history from the log. - GET /messages windows the log by chunk seq then groups; loadMoreMessages merges a turn split across the window boundary by turnId. - One-shot migration drops the legacy messages table and clears tabs; settings/credentials/keys/usage preserved. Full suite green (317 tests); biome, tsc, and svelte-check clean.
-rw-r--r--packages/api/src/agent-manager.ts241
-rw-r--r--packages/api/src/routes/tabs.ts20
-rw-r--r--packages/api/tests/agent-manager.test.ts11
-rw-r--r--packages/api/tests/routes.test.ts20
-rw-r--r--packages/core/src/agent/agent.tsbin54066 -> 54519 bytes
-rw-r--r--packages/core/src/chunks/transform.ts266
-rw-r--r--packages/core/src/db/chunks.ts150
-rw-r--r--packages/core/src/db/index.ts46
-rw-r--r--packages/core/src/db/messages.ts154
-rw-r--r--packages/core/src/index.ts17
-rw-r--r--packages/core/src/types/index.ts79
-rw-r--r--packages/core/tests/agent/agent.test.ts141
-rw-r--r--packages/core/tests/db/chunks.test.ts179
-rw-r--r--packages/frontend/src/lib/tabs.svelte.ts50
-rw-r--r--packages/frontend/src/lib/types.ts7
15 files changed, 1024 insertions, 357 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index c873388..f7975d1 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -3,9 +3,8 @@ import {
type AgentEvent,
type AgentSkillMapping,
type AgentStatus,
+ appendChunks,
appendEventToChunks,
- appendMessage,
- applySystemEvent,
BackgroundShellStore,
BackgroundTranscriptStore,
type Chunk,
@@ -26,6 +25,8 @@ import {
createYoutubeTranscribeTool,
type DispatchConfig,
expandAgentToolNames,
+ explodeTurn,
+ explodeUserText,
GLOBAL_AGENTS_DIR,
getAgentDirPaths,
getClaudeAccountsFromDB,
@@ -45,7 +46,6 @@ import {
type TabStatusSnapshot,
TaskList,
toAvailableAgents,
- updateMessage,
validateConfig,
} from "@dispatch/core";
import type { PermissionManager } from "./permission-manager.js";
@@ -179,13 +179,23 @@ interface TabAgent {
/** Store for transcript requests backgrounded due to user interrupt. */
transcriptStore: BackgroundTranscriptStore;
/**
- * In-flight assistant message chunks for the active turn. `null` when
- * no turn is running. Out-of-band system events (config-reload,
- * cancel, etc.) target this list when present.
+ * In-flight assistant chunks for the active turn. `null` when no turn is
+ * running. Out-of-band system events (config-reload, cancel, etc.) push
+ * onto this list when present; it is exploded into chunk rows when the
+ * turn flushes.
*/
currentChunks: Chunk[] | null;
- /** DB id of the in-flight assistant message (if persisted yet). */
+ /**
+ * Opaque id of the in-flight assistant turn, used as the `currentAssistantId`
+ * in the WS status snapshot so a reconnecting frontend can align its local
+ * streaming message. (No longer a DB row id — the turn is many chunk rows.)
+ */
currentAssistantId: string | null;
+ /**
+ * `turn_id` shared by the current turn's user message and assistant chunk
+ * rows. Set at the start of `processMessage`, cleared when the turn ends.
+ */
+ currentTurnId: string | null;
}
export class AgentManager {
@@ -331,6 +341,7 @@ export class AgentManager {
transcriptStore: new BackgroundTranscriptStore(),
currentChunks: null,
currentAssistantId: null,
+ currentTurnId: null,
};
this.tabAgents.set(tabId, tabAgent);
}
@@ -716,17 +727,14 @@ export class AgentManager {
// 3. Config or skills reload (configWatcher / skillsWatcher
// also null out `tabAgent.agent`).
//
- // Boundary semantics: `processMessage` calls `appendMessage`
- // for the current turn's user message BEFORE calling this
- // function, so the DB ends in `[..., u_current]`. In the
- // fallback retry path (agent-mode automatic model fallback),
- // the previous attempt may also have flushed a partial
- // assistant response, so the DB ends in
- // `[..., u_current, partial_a]`. Either way, we walk
- // backwards to the most recent user-role row and load only
- // strictly-prior rows: `agent.run()` will push the current
- // user message itself at agent.ts:546, so including it here
- // would duplicate it.
+ // Boundary semantics: `processMessage` appends the current turn's
+ // user message (as a chunk row) BEFORE calling this function, so the
+ // grouped history ends in `[..., u_current]`. In the fallback retry
+ // path the previous attempt may also have flushed a partial assistant
+ // turn, so it can end `[..., u_current, partial_a]`. Either way, we
+ // walk backwards to the most recent user-role message and load only
+ // strictly-prior messages: `agent.run()` pushes the current user
+ // message itself, so including it here would duplicate it.
//
// `toModelMessages` already filters out `role === "system"`
// rows and strips `error` / `system` chunks, so it's safe to
@@ -824,54 +832,31 @@ export class AgentManager {
}
/**
- * Persist a system chunk to a tab's message history.
+ * Persist a system chunk (notice / model-changed / config-reload /
+ * cancelled) to a tab's history.
*
* If an assistant turn is in flight (`currentChunks` is non-null), the
- * chunk is appended to the in-flight assistant message's chunk list —
- * the final `appendMessage` / `updateMessage` call at end-of-turn picks
- * it up automatically.
+ * chunk is folded into the in-flight chunk list; it is exploded into a
+ * `system` chunk row when the turn flushes.
*
- * Otherwise we load the tab's persisted messages, run them through
- * `applySystemEvent` (which either appends to an existing trailing
- * `role: "system"` message or creates a new one), then persist the
- * delta via `appendMessage` / `updateMessage`.
+ * Otherwise we append a standalone `system` chunk row immediately. Adjacent
+ * system rows are coalesced back into one system message at group time
+ * (`groupRowsToMessages`).
*/
private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void {
const tabAgent = this.tabAgents.get(tabId);
- // Turn in flight → append directly to the in-flight chunk list.
- // The chunk lands on the assistant message when it's persisted at
- // turn-end (or the assistant message is updated mid-turn elsewhere).
+ // Turn in flight → fold into the in-flight chunk list; it is exploded
+ // into chunk rows (including this system chunk) when the turn flushes.
if (tabAgent?.currentChunks) {
tabAgent.currentChunks.push({ type: "system", kind, text });
- if (tabAgent.currentAssistantId) {
- try {
- updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks));
- } catch {
- // Best-effort — the final persistence in processMessage will
- // flush the same chunks again.
- }
- }
return;
}
- // No turn in flight → route via applySystemEvent against the
- // persisted message list. Either appends to a trailing system
- // message or creates a fresh one.
+ // No turn in flight → persist a standalone system chunk row immediately.
try {
- const rows = getMessagesForTab(tabId);
- const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks }));
- const before = messages[messages.length - 1];
- const { messageId } = applySystemEvent(messages, { kind, text });
- const target = messages.find((m) => m.id === messageId);
- if (!target) return;
- if (before && before.id === messageId) {
- // Appended to existing trailing system message.
- updateMessage(messageId, JSON.stringify(target.chunks));
- } else {
- // Newly created system message.
- appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks));
- }
+ const turnId = tabAgent?.currentTurnId ?? crypto.randomUUID();
+ appendChunks(tabId, explodeTurn(turnId, [{ type: "system", kind, text }]));
} catch {
// DB not available (e.g. tab not yet created) — drop silently.
}
@@ -880,22 +865,17 @@ export class AgentManager {
stopTab(tabId: string): void {
const tabAgent = this.tabAgents.get(tabId);
if (tabAgent) {
- // If a turn is in flight, drop a `cancelled` system chunk into
- // the in-flight assistant message so the user sees an explicit
- // "Generation cancelled by user" marker at the cancellation point.
+ // If a turn is in flight, drop a `cancelled` system chunk into the
+ // in-flight chunk list so the user sees an explicit "Generation
+ // cancelled by user" marker at the cancellation point. It is
+ // persisted (as a chunk row) when `processMessage` flushes the
+ // aborted turn.
if (tabAgent.currentChunks) {
tabAgent.currentChunks.push({
type: "system",
kind: "cancelled",
text: "Generation cancelled by user",
});
- if (tabAgent.currentAssistantId) {
- try {
- updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks));
- } catch {
- // best-effort
- }
- }
}
tabAgent.abortController?.abort();
tabAgent.status = "idle";
@@ -1134,13 +1114,12 @@ export class AgentManager {
tabAgent.status = "running";
this.messageCount += 1;
- // Persist user message to DB (once, before any fallback retry)
- appendMessage(
- tabId,
- crypto.randomUUID(),
- "user",
- JSON.stringify([{ type: "text", text: message }]),
- );
+ // Persist the user message as a chunk row (once, before any fallback
+ // retry). The whole turn — this user message plus the assistant's
+ // chunk rows — shares one `turn_id`.
+ const turnId = crypto.randomUUID();
+ tabAgent.currentTurnId = turnId;
+ appendChunks(tabId, explodeUserText(turnId, message));
// Store agent models on the tab if provided (defines fallback order)
if (agentModels) {
@@ -1166,25 +1145,23 @@ export class AgentManager {
currentModelId = entry.model_id || undefined;
allOutput = "";
- // Single ordered chunk list for the assistant turn — replaces the
- // previous (text + toolCalls + thinking) tri-accumulator pattern.
- // Persisted progressively (insert on first chunk, update thereafter)
- // so out-of-band routes (config-reload, cancel) see real DB rows.
+ // Single ordered chunk list accumulating this attempt's assistant
+ // turn (text / thinking / tool-batch / error / system), folded from
+ // the stream via the shared `appendEventToChunks` helper.
const chunks: Chunk[] = [];
const assistantId = crypto.randomUUID();
let assistantPersisted = false;
tabAgent.currentChunks = chunks;
tabAgent.currentAssistantId = assistantId;
+ // Write-on-seal: explode the accumulated turn into flat chunk rows
+ // ONCE, when the turn settles. `explodeTurn` splits each step's
+ // `tool-batch` into separate `tool_call` + `tool_result` rows and
+ // tags every row with `turn_id` + derived `step`.
const flushAssistant = (): void => {
- if (chunks.length === 0) return;
- const json = JSON.stringify(chunks);
- if (!assistantPersisted) {
- appendMessage(tabId, assistantId, "assistant", json);
- assistantPersisted = true;
- } else {
- updateMessage(assistantId, json);
- }
+ if (assistantPersisted || chunks.length === 0) return;
+ appendChunks(tabId, explodeTurn(turnId, chunks));
+ assistantPersisted = true;
};
let attemptError: string | null = null;
@@ -1205,7 +1182,7 @@ export class AgentManager {
});
}
} catch {
- // Best-effort — if this fails, appendMessage will throw and we'll catch it below
+ // Best-effort — if this fails, chunk persistence will throw and we'll catch it below
}
for await (const event of agent.run(message, {
@@ -1237,23 +1214,12 @@ export class AgentManager {
allOutput += event.delta;
}
- if (event.type === "done") {
- // End of turn — flush the accumulated chunks. Reset the
- // in-flight pointers so out-of-band system events route
- // through `applySystemEvent` against the persisted list
- // instead of mutating a stale array.
- flushAssistant();
- chunks.length = 0;
- assistantPersisted = true; // suppress post-loop flush
- tabAgent.currentChunks = null;
- tabAgent.currentAssistantId = null;
- continue;
- }
-
// Route every content-bearing event through the shared helper.
// `appendEventToChunks` ignores lifecycle events (status / done
// / task-list-update / tab-created / message-* / etc), so it's
- // safe to call unconditionally.
+ // safe to call unconditionally. Persistence happens once, after
+ // the loop, so we never write a partial turn that a fallback
+ // retry would then duplicate.
appendEventToChunks(chunks, event);
}
} catch (err) {
@@ -1261,9 +1227,24 @@ export class AgentManager {
attemptError = err instanceof Error ? err.message : String(err);
}
- // Flush any accumulated assistant content from this attempt (covers
- // the abort/error/exception paths where we never saw a `done`).
- flushAssistant();
+ // Decide whether a fallback retry will supersede this attempt.
+ const isRetryable =
+ attemptError !== null &&
+ (attemptError.includes("status=429") ||
+ attemptError.toLowerCase().includes("rate limit") ||
+ attemptError.toLowerCase().includes("rate_limit") ||
+ attemptError.toLowerCase().includes("usage limit") ||
+ attemptError.toLowerCase().includes("exhausted"));
+ const nextEntry = fallbackSequence[fallbackIdx + 1];
+ const willRetry = Boolean(isRetryable && this.modelRegistry && tabAgent.keyId && nextEntry);
+
+ // Persist this attempt's turn — unless a retry will replace it, in
+ // which case the partial (and its error chunk) is discarded so the
+ // next attempt's chunks don't merge with a failed one. On success,
+ // abort, or a final error, the turn is flushed exactly once.
+ if (!willRetry) {
+ flushAssistant();
+ }
tabAgent.currentChunks = null;
tabAgent.currentAssistantId = null;
@@ -1273,43 +1254,27 @@ export class AgentManager {
break;
}
- // Check if error is retryable (rate limit / exhausted key)
- const isRetryable =
- attemptError.includes("status=429") ||
- attemptError.toLowerCase().includes("rate limit") ||
- attemptError.toLowerCase().includes("rate_limit") ||
- attemptError.toLowerCase().includes("usage limit") ||
- attemptError.toLowerCase().includes("exhausted");
-
- if (isRetryable && this.modelRegistry && tabAgent.keyId) {
- this.modelRegistry.markKeyExhausted(tabAgent.keyId, attemptError);
-
- // Try the next entry in the agent's fallback sequence
- const nextIdx = fallbackIdx + 1;
- const nextEntry = fallbackSequence[nextIdx];
- if (nextIdx < maxFallbackAttempts && nextEntry) {
- const fallbackMsg =
- `Key "${tabAgent.keyId}" rate limited. ` +
- `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`;
- console.warn(`[dispatch] ${fallbackMsg}`);
- // Persist the notice + model-change as system chunks. We're
- // between turns here (just flushed the previous assistant
- // message), so the helper routes them into a `role: "system"`
- // message via `applySystemEvent`.
- this.emit({ type: "notice", message: fallbackMsg }, tabId);
- this.routeSystemEventToTab(tabId, "notice", fallbackMsg);
- this.emit(
- { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id },
- tabId,
- );
- this.routeSystemEventToTab(
- tabId,
- "model-changed",
- `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`,
- );
- tabAgent.agent = null;
- continue;
- }
+ if (willRetry && nextEntry && tabAgent.keyId) {
+ this.modelRegistry?.markKeyExhausted(tabAgent.keyId, attemptError);
+ const fallbackMsg =
+ `Key "${tabAgent.keyId}" rate limited. ` +
+ `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`;
+ console.warn(`[dispatch] ${fallbackMsg}`);
+ // Persist the notice + model-change as standalone system chunk
+ // rows (no turn in flight now — currentChunks was just cleared).
+ this.emit({ type: "notice", message: fallbackMsg }, tabId);
+ this.routeSystemEventToTab(tabId, "notice", fallbackMsg);
+ this.emit(
+ { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id },
+ tabId,
+ );
+ this.routeSystemEventToTab(
+ tabId,
+ "model-changed",
+ `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`,
+ );
+ tabAgent.agent = null;
+ continue;
}
// All fallbacks exhausted or non-retryable error
@@ -1319,6 +1284,8 @@ export class AgentManager {
this.emit({ type: "status", status: "error" }, tabId);
break;
}
+ // Turn fully settled — clear the shared turn id.
+ tabAgent.currentTurnId = null;
// Resolve completion promise for child agents
if (processError === null) {
diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts
index afa5735..e9265ec 100644
--- a/packages/api/src/routes/tabs.ts
+++ b/packages/api/src/routes/tabs.ts
@@ -2,10 +2,11 @@ import {
archiveTab,
createTab,
deleteSetting,
- getMessagesForTab,
+ getChunksForTab,
getSetting,
getTab,
- getTotalMessageCount,
+ getTotalChunkCount,
+ groupRowsToMessages,
listOpenTabs,
setSetting,
updateTabModel,
@@ -65,6 +66,11 @@ tabsRoutes.get("/:id", (c) => {
return c.json(tab);
});
+// Conversation history for a tab, paginated at CHUNK granularity. The flat
+// chunk log is windowed by `limit`/`before` (both chunk-`seq` cursors) so a
+// single huge turn never dumps in full, then grouped into render messages.
+// `before` is the oldest chunk seq the client already holds. This is what
+// powers per-chunk frontend pagination / memory control.
tabsRoutes.get("/:id/messages", (c) => {
const id = c.req.param("id");
const limitRaw = c.req.query("limit");
@@ -78,9 +84,13 @@ tabsRoutes.get("/:id/messages", (c) => {
...(before !== undefined && Number.isFinite(before) ? { before } : {}),
}
: undefined;
- const messages = getMessagesForTab(id, options);
- const total = getTotalMessageCount(id);
- return c.json({ messages, total });
+ const chunks = getChunksForTab(id, options);
+ const messages = groupRowsToMessages(chunks);
+ // `oldestSeq` is the chunk-seq cursor the client pages backward from; null
+ // when the window is empty.
+ const oldestSeq = chunks.length > 0 ? (chunks[0]?.seq ?? null) : null;
+ const total = getTotalChunkCount(id);
+ return c.json({ messages, total, oldestSeq });
});
tabsRoutes.patch("/:id", async (c) => {
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index ba14cad..6b016db 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -259,8 +259,15 @@ vi.mock("@dispatch/core", () => ({
getSetting(_key: string) {
return null;
},
- appendMessage() {},
- updateMessage() {},
+ appendChunks() {
+ return [];
+ },
+ explodeUserText() {
+ return [];
+ },
+ explodeTurn() {
+ return [];
+ },
getMessagesForTab(tabId: string) {
return fakeMessagesByTab.get(tabId) ?? [];
},
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index eba2226..f4de845 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -181,11 +181,27 @@ vi.mock("@dispatch/core", () => ({
getSetting(_key: string) {
return null;
},
- appendMessage() {},
- updateMessage() {},
+ appendChunks() {
+ return [];
+ },
+ explodeUserText() {
+ return [];
+ },
+ explodeTurn() {
+ return [];
+ },
getMessagesForTab() {
return [];
},
+ getChunksForTab() {
+ return [];
+ },
+ groupRowsToMessages() {
+ return [];
+ },
+ getTotalChunkCount() {
+ return 0;
+ },
appendEventToChunks(_chunks: unknown[], _event: unknown) {
// no-op stub
},
diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts
index 439b436..4638301 100644
--- a/packages/core/src/agent/agent.ts
+++ b/packages/core/src/agent/agent.ts
Binary files differ
diff --git a/packages/core/src/chunks/transform.ts b/packages/core/src/chunks/transform.ts
new file mode 100644
index 0000000..a4c6fc8
--- /dev/null
+++ b/packages/core/src/chunks/transform.ts
@@ -0,0 +1,266 @@
+// Pure, dependency-free transforms between the render-shaped `Chunk[]` /
+// `ChatMessage` model and the flat append-only `ChunkRow` log. Kept free of any
+// DB (`bun:sqlite`) import so BOTH the backend persistence layer
+// (`db/chunks.ts`) and the browser frontend store can share the exact same
+// explode/group logic.
+
+import type {
+ Chunk,
+ ChunkRow,
+ ChunkRowDraft,
+ ErrorData,
+ MessageRole,
+ SystemData,
+ TextData,
+ ThinkingData,
+ ToolBatchChunk,
+ ToolCallData,
+ ToolResultData,
+} from "../types/index.js";
+
+/**
+ * A DERIVED message — a grouping of contiguous chunk rows reconstructed for the
+ * agent's in-memory history and for the frontend's render bubbles. NOT a stored
+ * shape: the source of truth is the flat chunk log.
+ */
+export interface MessageRow {
+ id: string;
+ tabId: string;
+ seq: number;
+ /** turn_id of the chunk rows this message was grouped from. */
+ turnId: string;
+ role: MessageRole;
+ chunks: Chunk[];
+ createdAt: number;
+}
+
+// ─── Explode: in-memory turn → flat chunk-row drafts ─────────────
+//
+// A turn's render-shaped `Chunk[]` is flattened into append-only rows.
+// `tool-batch` chunks are split into SEPARATE `tool_call` (role=assistant) and
+// `tool_result` (role=tool) rows linked by `callId`, mapping 1:1 to the
+// Anthropic wire format. `step` increments after each tool-batch: every LLM
+// round-trip emits text/thinking then (optionally) one tool-batch, so a
+// tool-batch boundary is exactly a step boundary.
+
+/** Explode a single user message's text into one row draft. */
+export function explodeUserText(turnId: string, text: string): ChunkRowDraft[] {
+ return [{ turnId, step: 0, role: "user", type: "text", data: { text } }];
+}
+
+/** Explode an assistant turn's accumulated chunks into ordered row drafts. */
+export function explodeTurn(turnId: string, chunks: Chunk[]): ChunkRowDraft[] {
+ const drafts: ChunkRowDraft[] = [];
+ let step = 0;
+ for (const chunk of chunks) {
+ switch (chunk.type) {
+ case "text":
+ drafts.push({ turnId, step, role: "assistant", type: "text", data: { text: chunk.text } });
+ break;
+ case "thinking":
+ drafts.push({
+ turnId,
+ step,
+ role: "assistant",
+ type: "thinking",
+ data: {
+ text: chunk.text,
+ ...(chunk.metadata !== undefined ? { metadata: chunk.metadata } : {}),
+ },
+ });
+ break;
+ case "tool-batch": {
+ for (const call of chunk.calls) {
+ drafts.push({
+ turnId,
+ step,
+ role: "assistant",
+ type: "tool_call",
+ data: { callId: call.id, name: call.name, arguments: call.arguments },
+ });
+ }
+ for (const call of chunk.calls) {
+ if (call.result === undefined) continue;
+ drafts.push({
+ turnId,
+ step,
+ role: "tool",
+ type: "tool_result",
+ data: {
+ callId: call.id,
+ name: call.name,
+ result: call.result,
+ isError: call.isError ?? false,
+ ...(call.shellOutput !== undefined ? { shellOutput: call.shellOutput } : {}),
+ },
+ });
+ }
+ // A tool-batch ends the current LLM step; subsequent chunks belong
+ // to the next round-trip.
+ step++;
+ break;
+ }
+ case "error":
+ drafts.push({
+ turnId,
+ step,
+ role: "assistant",
+ type: "error",
+ data: {
+ message: chunk.message,
+ ...(chunk.statusCode !== undefined ? { statusCode: chunk.statusCode } : {}),
+ },
+ });
+ break;
+ case "system":
+ drafts.push({
+ turnId,
+ step,
+ role: "system",
+ type: "system",
+ data: { kind: chunk.kind, text: chunk.text },
+ });
+ break;
+ }
+ }
+ return drafts;
+}
+
+// ─── Group: flat chunk rows → derived render messages ────────────
+//
+// The inverse of explode (best-effort over an arbitrary window, so it tolerates
+// orphan tool-results whose tool-call was paged out). `tool_result` rows
+// (role=tool) merge back into the preceding assistant message's per-step
+// `tool-batch` chunk by `callId` rather than forming their own message.
+
+export function groupRowsToMessages(rows: ChunkRow[]): MessageRow[] {
+ const messages: MessageRow[] = [];
+
+ let current: { msg: MessageRow; batches: Map<number, ToolBatchChunk> } | null = null;
+ const flush = () => {
+ if (current) {
+ messages.push(current.msg);
+ current = null;
+ }
+ };
+ const ensureAssistant = (row: ChunkRow) => {
+ if (!current) {
+ current = {
+ msg: {
+ id: row.id,
+ tabId: row.tabId,
+ seq: row.seq,
+ turnId: row.turnId,
+ role: "assistant",
+ chunks: [],
+ createdAt: row.createdAt,
+ },
+ batches: new Map(),
+ };
+ }
+ return current;
+ };
+ const ensureBatch = (step: number): ToolBatchChunk => {
+ const c = current;
+ if (!c) throw new Error("ensureBatch called without an assistant message");
+ let batch = c.batches.get(step);
+ if (!batch) {
+ batch = { type: "tool-batch", calls: [] };
+ c.batches.set(step, batch);
+ c.msg.chunks.push(batch);
+ }
+ return batch;
+ };
+
+ for (const row of rows) {
+ if (row.role === "user") {
+ flush();
+ const d = row.data as TextData;
+ messages.push({
+ id: row.id,
+ tabId: row.tabId,
+ seq: row.seq,
+ turnId: row.turnId,
+ role: "user",
+ chunks: [{ type: "text", text: d.text }],
+ createdAt: row.createdAt,
+ });
+ continue;
+ }
+ if (row.role === "system") {
+ // Coalesce consecutive system rows into one system message (multiple
+ // system chunks), matching the old applySystemEvent behaviour.
+ const prev = messages[messages.length - 1];
+ const d = row.data as SystemData;
+ if (current === null && prev && prev.role === "system") {
+ prev.chunks.push({ type: "system", kind: d.kind, text: d.text });
+ continue;
+ }
+ flush();
+ messages.push({
+ id: row.id,
+ tabId: row.tabId,
+ seq: row.seq,
+ turnId: row.turnId,
+ role: "system",
+ chunks: [{ type: "system", kind: d.kind, text: d.text }],
+ createdAt: row.createdAt,
+ });
+ continue;
+ }
+
+ // assistant / tool rows → part of the current assistant message
+ const c = ensureAssistant(row);
+ switch (row.type) {
+ case "text":
+ c.msg.chunks.push({ type: "text", text: (row.data as TextData).text });
+ break;
+ case "thinking": {
+ const d = row.data as ThinkingData;
+ c.msg.chunks.push({
+ type: "thinking",
+ text: d.text,
+ ...(d.metadata !== undefined ? { metadata: d.metadata } : {}),
+ });
+ break;
+ }
+ case "error": {
+ const d = row.data as ErrorData;
+ c.msg.chunks.push({
+ type: "error",
+ message: d.message,
+ ...(d.statusCode !== undefined ? { statusCode: d.statusCode } : {}),
+ });
+ break;
+ }
+ case "tool_call": {
+ const d = row.data as ToolCallData;
+ ensureBatch(row.step).calls.push({ id: d.callId, name: d.name, arguments: d.arguments });
+ break;
+ }
+ case "tool_result": {
+ const d = row.data as ToolResultData;
+ const batch = ensureBatch(row.step);
+ const entry = batch.calls.find((e) => e.id === d.callId);
+ if (entry) {
+ entry.result = d.result;
+ entry.isError = d.isError;
+ if (d.shellOutput !== undefined) entry.shellOutput = d.shellOutput;
+ } else {
+ // Orphan result (its tool_call was paged out of this window).
+ batch.calls.push({
+ id: d.callId,
+ name: d.name,
+ arguments: {},
+ result: d.result,
+ isError: d.isError,
+ ...(d.shellOutput !== undefined ? { shellOutput: d.shellOutput } : {}),
+ });
+ }
+ break;
+ }
+ }
+ }
+ flush();
+ return messages;
+}
diff --git a/packages/core/src/db/chunks.ts b/packages/core/src/db/chunks.ts
new file mode 100644
index 0000000..6841eb5
--- /dev/null
+++ b/packages/core/src/db/chunks.ts
@@ -0,0 +1,150 @@
+import { randomUUID } from "node:crypto";
+import {
+ explodeTurn,
+ explodeUserText,
+ groupRowsToMessages,
+ type MessageRow,
+} from "../chunks/transform.js";
+import type { ChunkData, ChunkRow, ChunkRowDraft, TextData } from "../types/index.js";
+import { getDatabase } from "./index.js";
+
+// Re-export the DB-free transforms so existing barrel consumers
+// (`@dispatch/core`) keep importing them from here. The browser frontend deep-
+// imports them directly from `chunks/transform.js` to avoid the DB dependency.
+export { explodeTurn, explodeUserText, groupRowsToMessages, type MessageRow };
+
+// ─── Persistence ─────────────────────────────────────────────────
+
+function mapRow(row: Record<string, unknown>): ChunkRow {
+ let data: ChunkData;
+ try {
+ data = JSON.parse(row.data_json as string) as ChunkData;
+ } catch {
+ data = { text: "" } as TextData;
+ }
+ return {
+ id: row.id as string,
+ tabId: row.tab_id as string,
+ seq: row.seq as number,
+ turnId: row.turn_id as string,
+ step: row.step as number,
+ role: row.role as ChunkRow["role"],
+ type: row.type as ChunkRow["type"],
+ data,
+ createdAt: row.created_at as number,
+ };
+}
+
+/**
+ * Append one or more chunk-row drafts to a tab, assigning a monotonic per-tab
+ * `seq` and a fresh id/timestamp to each. Returns the inserted rows in order.
+ */
+export function appendChunks(tabId: string, drafts: ChunkRowDraft[]): ChunkRow[] {
+ if (drafts.length === 0) return [];
+ const db = getDatabase();
+ const maxSeq = db
+ .query("SELECT COALESCE(MAX(seq), -1) as max_seq FROM chunks WHERE tab_id = $tabId")
+ .get({ $tabId: tabId }) as { max_seq: number };
+ let seq = (maxSeq?.max_seq ?? -1) + 1;
+ const now = Date.now();
+ const insert = db.query(
+ `INSERT INTO chunks (id, tab_id, seq, turn_id, step, role, type, data_json, created_at)
+ VALUES ($id, $tabId, $seq, $turnId, $step, $role, $type, $dataJson, $now)`,
+ );
+ const out: ChunkRow[] = [];
+ for (const draft of drafts) {
+ const id = randomUUID();
+ insert.run({
+ $id: id,
+ $tabId: tabId,
+ $seq: seq,
+ $turnId: draft.turnId,
+ $step: draft.step,
+ $role: draft.role,
+ $type: draft.type,
+ $dataJson: JSON.stringify(draft.data),
+ $now: now,
+ });
+ out.push({
+ id,
+ tabId,
+ seq,
+ turnId: draft.turnId,
+ step: draft.step,
+ role: draft.role,
+ type: draft.type,
+ data: draft.data,
+ createdAt: now,
+ });
+ seq++;
+ }
+ return out;
+}
+
+/**
+ * Read chunk rows for a tab in `seq` order (ASC). Pagination mirrors the old
+ * message pagination but at chunk granularity:
+ * - no options → all rows;
+ * - `before` → rows with `seq < before`, most-recent-first then reversed;
+ * - `limit` → most recent `limit` rows, reversed to ASC.
+ */
+export function getChunksForTab(
+ tabId: string,
+ options?: { limit?: number; before?: number },
+): ChunkRow[] {
+ const db = getDatabase();
+ if (!options) {
+ const rows = db
+ .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC")
+ .all({ $tabId: tabId }) as Array<Record<string, unknown>>;
+ return rows.map(mapRow);
+ }
+ const { limit, before } = options;
+ if (before !== undefined) {
+ if (limit !== undefined) {
+ const rows = db
+ .query(
+ "SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC LIMIT $limit",
+ )
+ .all({ $tabId: tabId, $before: before, $limit: limit }) as Array<Record<string, unknown>>;
+ return rows.map(mapRow).reverse();
+ }
+ const rows = db
+ .query("SELECT * FROM chunks WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC")
+ .all({ $tabId: tabId, $before: before }) as Array<Record<string, unknown>>;
+ return rows.map(mapRow).reverse();
+ }
+ if (limit !== undefined) {
+ const rows = db
+ .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq DESC LIMIT $limit")
+ .all({ $tabId: tabId, $limit: limit }) as Array<Record<string, unknown>>;
+ return rows.map(mapRow).reverse();
+ }
+ const rows = db
+ .query("SELECT * FROM chunks WHERE tab_id = $tabId ORDER BY seq ASC")
+ .all({ $tabId: tabId }) as Array<Record<string, unknown>>;
+ return rows.map(mapRow);
+}
+
+/**
+ * Derived, grouped view of a tab's full history as messages. Used to
+ * pre-populate the agent's in-memory `ChatMessage[]` history when an Agent is
+ * (re)constructed. Always reads the full log (grouping a partial window would
+ * be lossy for the rebuild path).
+ */
+export function getMessagesForTab(tabId: string): MessageRow[] {
+ return groupRowsToMessages(getChunksForTab(tabId));
+}
+
+export function getTotalChunkCount(tabId: string): number {
+ const db = getDatabase();
+ const row = db
+ .query("SELECT COUNT(*) as count FROM chunks WHERE tab_id = $tabId")
+ .get({ $tabId: tabId }) as { count: number } | null;
+ return row?.count ?? 0;
+}
+
+export function clearChunksForTab(tabId: string): void {
+ const db = getDatabase();
+ db.query("DELETE FROM chunks WHERE tab_id = $tabId").run({ $tabId: tabId });
+}
diff --git a/packages/core/src/db/index.ts b/packages/core/src/db/index.ts
index e63b266..18dd1b5 100644
--- a/packages/core/src/db/index.ts
+++ b/packages/core/src/db/index.ts
@@ -93,16 +93,46 @@ export function getDatabase(): Database {
// Column already exists — ignore
}
- _db.run(`CREATE TABLE IF NOT EXISTS messages (
- id TEXT PRIMARY KEY,
- tab_id TEXT NOT NULL REFERENCES tabs(id),
- seq INTEGER NOT NULL,
- role TEXT NOT NULL,
- content_json TEXT NOT NULL,
- created_at INTEGER NOT NULL
+ // ─── Append-only chunk log (replaces the old `messages` blob table) ──
+ //
+ // A conversation is stored as a flat, append-only stream of chunk rows
+ // keyed by a per-tab monotonic `seq`. "Message" and "turn" are DERIVED
+ // groupings (see db/chunks.ts), never stored containers. This is what
+ // powers per-chunk frontend pagination AND the stable per-step wire
+ // format that fixes Anthropic prompt-cache churn (see plan-chunk-log.md).
+ //
+ // role : 'user' | 'assistant' | 'tool' | 'system'
+ // type : 'text' | 'thinking' | 'tool_call' | 'tool_result' | 'error' | 'system'
+ // step : LLM round-trip index within a turn (user/system rows = 0)
+ // data_json: the type-specific payload (see ChunkData in types)
+ _db.run(`CREATE TABLE IF NOT EXISTS chunks (
+ id TEXT PRIMARY KEY,
+ tab_id TEXT NOT NULL,
+ seq INTEGER NOT NULL,
+ turn_id TEXT NOT NULL,
+ step INTEGER NOT NULL DEFAULT 0,
+ role TEXT NOT NULL,
+ type TEXT NOT NULL,
+ data_json TEXT NOT NULL,
+ created_at INTEGER NOT NULL
)`);
- _db.run(`CREATE INDEX IF NOT EXISTS idx_messages_tab ON messages(tab_id, seq)`);
+ _db.run(`CREATE INDEX IF NOT EXISTS idx_chunks_tab_seq ON chunks(tab_id, seq)`);
+
+ // One-shot migration off the legacy `messages` blob model. Beta software,
+ // no backward compatibility: the old chat history is destroyed (tabs +
+ // messages), while settings / credentials / api_keys / usage_cache /
+ // wake_schedule are preserved. Detect the old schema by the presence of
+ // the `messages` table; once dropped, this branch never runs again.
+ const hasLegacyMessages = _db
+ .query("SELECT name FROM sqlite_master WHERE type='table' AND name='messages'")
+ .get() as { name: string } | null;
+ if (hasLegacyMessages) {
+ _db.run("DROP TABLE IF EXISTS messages");
+ // Clear conversation containers too (fresh slate for the new model).
+ _db.run("DELETE FROM tabs");
+ _db.run("DELETE FROM chunks");
+ }
_db.run(`CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
diff --git a/packages/core/src/db/messages.ts b/packages/core/src/db/messages.ts
deleted file mode 100644
index 7fc6ccf..0000000
--- a/packages/core/src/db/messages.ts
+++ /dev/null
@@ -1,154 +0,0 @@
-import type { Chunk, MessageRole } from "../types/index.js";
-import { getDatabase } from "./index.js";
-
-/**
- * A persisted message row, with `content_json` already parsed into a `Chunk[]`.
- * Mirrors the new schema (no `thinking` column — that lived under the old
- * `content + toolCalls + toolResults + thinking` model).
- */
-export interface MessageRow {
- id: string;
- tabId: string;
- seq: number;
- role: MessageRole;
- chunks: Chunk[];
- createdAt: number;
-}
-
-/**
- * Append a new message to the tab. Caller passes the already-serialized
- * chunk list as `contentJson` (i.e. `JSON.stringify(chunks)`).
- */
-export function appendMessage(
- tabId: string,
- id: string,
- role: MessageRole,
- contentJson: string,
-): void {
- const db = getDatabase();
- const maxSeq = db
- .query("SELECT COALESCE(MAX(seq), -1) as max_seq FROM messages WHERE tab_id = $tabId")
- .get({ $tabId: tabId }) as { max_seq: number };
- const seq = (maxSeq?.max_seq ?? -1) + 1;
- db.query(
- `INSERT INTO messages (id, tab_id, seq, role, content_json, created_at)
- VALUES ($id, $tabId, $seq, $role, $contentJson, $now)`,
- ).run({
- $id: id,
- $tabId: tabId,
- $seq: seq,
- $role: role,
- $contentJson: contentJson,
- $now: Date.now(),
- });
-}
-
-/**
- * Replace the persisted chunks for an existing message. `contentJson` is
- * the already-serialized chunk list.
- */
-export function updateMessage(id: string, contentJson: string): void {
- const db = getDatabase();
- db.query("UPDATE messages SET content_json = $contentJson WHERE id = $id").run({
- $id: id,
- $contentJson: contentJson,
- });
-}
-
-/**
- * Read messages for a tab in seq order (ASC). `content_json` is parsed into
- * `Chunk[]` here so callers don't have to. If a row's JSON is malformed,
- * the message is returned with an empty chunk list rather than throwing.
- *
- * When `options` is omitted, returns ALL messages (backward compatible).
- *
- * When `options.before` is provided, returns messages with `seq < before`,
- * taking the most recent ones first (DESC) up to `options.limit`, then
- * reversing back to ASC before returning.
- *
- * When only `options.limit` is provided, returns the most recent `limit`
- * messages, reversed back to ASC.
- */
-export function getMessagesForTab(
- tabId: string,
- options?: { limit?: number; before?: number },
-): MessageRow[] {
- const db = getDatabase();
-
- const mapRow = (row: Record<string, unknown>): MessageRow => {
- const rawJson = row.content_json as string;
- let chunks: Chunk[];
- try {
- const parsed = JSON.parse(rawJson);
- chunks = Array.isArray(parsed) ? (parsed as Chunk[]) : [];
- } catch {
- chunks = [];
- }
- return {
- id: row.id as string,
- tabId: row.tab_id as string,
- seq: row.seq as number,
- role: row.role as MessageRole,
- chunks,
- createdAt: row.created_at as number,
- };
- };
-
- // Backward-compatible path: no options → ALL messages, seq ASC.
- if (!options) {
- const rows = db
- .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq ASC")
- .all({ $tabId: tabId }) as Array<Record<string, unknown>>;
- return rows.map(mapRow);
- }
-
- const { limit, before } = options;
-
- // Paginated path: fetch DESC, then reverse to ASC before returning.
- if (before !== undefined) {
- // `seq < before`, DESC, optionally limited.
- if (limit !== undefined) {
- const rows = db
- .query(
- "SELECT * FROM messages WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC LIMIT $limit",
- )
- .all({ $tabId: tabId, $before: before, $limit: limit }) as Array<Record<string, unknown>>;
- return rows.map(mapRow).reverse();
- }
- const rows = db
- .query("SELECT * FROM messages WHERE tab_id = $tabId AND seq < $before ORDER BY seq DESC")
- .all({ $tabId: tabId, $before: before }) as Array<Record<string, unknown>>;
- return rows.map(mapRow).reverse();
- }
-
- // Only `limit` provided: most recent `limit`, reversed to ASC.
- if (limit !== undefined) {
- const rows = db
- .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq DESC LIMIT $limit")
- .all({ $tabId: tabId, $limit: limit }) as Array<Record<string, unknown>>;
- return rows.map(mapRow).reverse();
- }
-
- // `options` was provided but empty → same as no options.
- const rows = db
- .query("SELECT * FROM messages WHERE tab_id = $tabId ORDER BY seq ASC")
- .all({ $tabId: tabId }) as Array<Record<string, unknown>>;
- return rows.map(mapRow);
-}
-
-/**
- * Return the total number of persisted messages for a tab.
- * Used by the API to advertise total history size alongside a paginated window.
- */
-export function getTotalMessageCount(tabId: string): number {
- const db = getDatabase();
- const row = db
- .query("SELECT COUNT(*) as count FROM messages WHERE tab_id = $tabId")
- .get({ $tabId: tabId }) as { count: number } | null;
- return row?.count ?? 0;
-}
-
-export function clearMessagesForTab(tabId: string): void {
- const db = getDatabase();
- db.query("DELETE FROM messages WHERE tab_id = $tabId").run({ $tabId: tabId });
-}
diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts
index e33ad2f..9fe7550 100644
--- a/packages/core/src/index.ts
+++ b/packages/core/src/index.ts
@@ -29,16 +29,19 @@ export {
} from "./config/index.js";
// Credentials
export * from "./credentials/index.js";
-// Database
-export { closeDatabase, getDatabase, getDatabasePath } from "./db/index.js";
export {
- appendMessage,
- clearMessagesForTab,
+ appendChunks,
+ clearChunksForTab,
+ explodeTurn,
+ explodeUserText,
+ getChunksForTab,
getMessagesForTab,
- getTotalMessageCount,
+ getTotalChunkCount,
+ groupRowsToMessages,
type MessageRow,
- updateMessage,
-} from "./db/messages.js";
+} from "./db/chunks.js";
+// Database
+export { closeDatabase, getDatabase, getDatabasePath } from "./db/index.js";
export { deleteSetting, getSetting, setSetting } from "./db/settings.js";
// Tabs & Messages
export {
diff --git a/packages/core/src/types/index.ts b/packages/core/src/types/index.ts
index 7e7460b..3fcdb40 100644
--- a/packages/core/src/types/index.ts
+++ b/packages/core/src/types/index.ts
@@ -78,6 +78,85 @@ export interface ChatMessage {
chunks: Chunk[];
}
+// ─── Append-only chunk log (persisted model) ─────────────────────
+//
+// The DB stores a conversation as a flat stream of `ChunkRow`s (see
+// db/chunks.ts). The render-facing `Chunk`/`ChatMessage` shapes above are
+// DERIVED from these rows by grouping (turn_id + step + role). Tool calls
+// and their results are SEPARATE rows linked by `callId`, mapping 1:1 to the
+// Anthropic wire format.
+
+/** Role of a persisted chunk row. `tool` rows hold tool results. */
+export type ChunkRole = "user" | "assistant" | "tool" | "system";
+
+/** Discriminator for a persisted chunk row's payload. */
+export type ChunkType = "text" | "thinking" | "tool_call" | "tool_result" | "error" | "system";
+
+export interface TextData {
+ text: string;
+}
+export interface ThinkingData {
+ text: string;
+ metadata?: Record<string, unknown>;
+}
+export interface ToolCallData {
+ callId: string;
+ name: string;
+ arguments: Record<string, unknown>;
+}
+export interface ToolResultData {
+ callId: string;
+ name: string;
+ result: string;
+ isError: boolean;
+ shellOutput?: { stdout: string; stderr: string };
+}
+export interface ErrorData {
+ message: string;
+ statusCode?: number;
+}
+export interface SystemData {
+ kind: SystemChunkKind;
+ text: string;
+}
+
+export type ChunkData =
+ | TextData
+ | ThinkingData
+ | ToolCallData
+ | ToolResultData
+ | ErrorData
+ | SystemData;
+
+/**
+ * A persisted chunk row — the append-only unit of conversation storage and
+ * the unit of frontend pagination. `seq` is per-tab monotonic and is both the
+ * ordering key and the pagination cursor.
+ */
+export interface ChunkRow {
+ id: string;
+ tabId: string;
+ seq: number;
+ turnId: string;
+ step: number;
+ role: ChunkRole;
+ type: ChunkType;
+ data: ChunkData;
+ createdAt: number;
+}
+
+/**
+ * A chunk-row draft (no `seq`/`tabId`/`createdAt`/`id` yet) used when
+ * exploding an in-memory turn into rows for persistence.
+ */
+export interface ChunkRowDraft {
+ turnId: string;
+ step: number;
+ role: ChunkRole;
+ type: ChunkType;
+ data: ChunkData;
+}
+
export interface ToolCall {
id: string;
name: string;
diff --git a/packages/core/tests/agent/agent.test.ts b/packages/core/tests/agent/agent.test.ts
index d6daac6..6c2b452 100644
--- a/packages/core/tests/agent/agent.test.ts
+++ b/packages/core/tests/agent/agent.test.ts
@@ -450,10 +450,11 @@ describe("Agent", () => {
expect(toolContent[0]).not.toHaveProperty("result");
});
- it("Anthropic [tool-call, text] split: mixed-order assistant message gets split into [text]+[tool-call]", async () => {
- // Pre-seed an assistant message with chunks in [tool-batch, text] order —
- // which produces [tool-call, text] in the ModelMessage content, a shape
- // Anthropic rejects. Only applies for anthropic / opencode-anthropic provider.
+ it("per-step segmentation: a [tool-batch, text] turn becomes [assistant(tool-call), tool(result), assistant(text)]", async () => {
+ // `toModelMessages` segments a turn at each tool-batch boundary, so the
+ // tool-batch (step 0) and the trailing text (step 1) land in SEPARATE
+ // assistant messages — never a single invalid [tool_use, text] block.
+ // This is the cache-stability fix and is applied for every provider.
const agent = new Agent(makeConfig({ provider: "opencode-anthropic" }));
agent.messages.push({
role: "user",
@@ -490,34 +491,31 @@ describe("Agent", () => {
const callArgs = vi.mocked(streamText).mock.calls.at(-1)?.[0];
const messages = callArgs?.messages as Array<{ role: string; content: unknown }>;
- // After Anthropic structural normalisation, we should have TWO assistant messages:
- // 1st: text-only content
- // 2nd: tool-call-only content
+ // No assistant message may mix tool-call and non-tool-call parts (the
+ // invalid shape Anthropic rejects); segmentation guarantees this.
const assistantMsgs = messages.filter((m) => m.role === "assistant");
- expect(assistantMsgs.length).toBeGreaterThanOrEqual(2);
-
- // Find the text-only assistant message and the tool-call-only assistant message
- const textOnlyMsg = assistantMsgs.find((m) => {
+ for (const m of assistantMsgs) {
const c = m.content as Array<Record<string, unknown>>;
- return Array.isArray(c) && c.every((p) => p.type !== "tool-call");
- });
- const toolOnlyMsg = assistantMsgs.find((m) => {
+ if (!Array.isArray(c)) continue;
+ const hasToolCall = c.some((p) => p.type === "tool-call");
+ const hasNonToolCall = c.some((p) => p.type !== "tool-call");
+ expect(hasToolCall && hasNonToolCall).toBe(false);
+ }
+
+ // The seeded turn yields a tool-call assistant message immediately
+ // followed by its tool-result message (valid tool_use → tool_result).
+ const toolOnlyIdx = messages.findIndex((m) => {
const c = m.content as Array<Record<string, unknown>>;
- return Array.isArray(c) && c.every((p) => p.type === "tool-call");
+ return m.role === "assistant" && Array.isArray(c) && c.some((p) => p.type === "tool-call");
});
-
- // Narrow the optionals — toBeDefined() already verified non-null,
- // but TypeScript needs the explicit assertion via local consts so
- // we can pass them to indexOf without `!`.
- if (!textOnlyMsg || !toolOnlyMsg) throw new Error("type guard");
-
- // Text message comes first (before tool-call message) — Anthropic requires this ordering
- expect(messages.indexOf(textOnlyMsg)).toBeLessThan(messages.indexOf(toolOnlyMsg));
+ expect(toolOnlyIdx).toBeGreaterThanOrEqual(0);
+ expect(messages[toolOnlyIdx + 1]?.role).toBe("tool");
});
- it("Anthropic [tool-call, text] split: openai-compatible provider preserves original order (no split)", async () => {
- // For non-Anthropic providers, the [tool-call, text] split should NOT be applied.
- // (No provider set → defaults to openai-compatible)
+ it("per-step segmentation also applies to the openai-compatible provider", async () => {
+ // Segmentation is provider-agnostic: a [tool-batch, text] turn is split
+ // into separate assistant messages for openai-compatible too, with the
+ // tool result in its own tool message (the standard OpenAI shape).
const agent = new Agent(makeConfig());
agent.messages.push({
role: "user",
@@ -552,13 +550,24 @@ describe("Agent", () => {
const callArgs = vi.mocked(streamText).mock.calls.at(-1)?.[0];
const messages = callArgs?.messages as Array<{ role: string; content: unknown }>;
- // For openai-compatible provider, only ONE assistant message with mixed content
+ // The seeded [tool-batch, text] turn is segmented: a tool-call-only
+ // assistant message, its tool message, and a separate text assistant
+ // message (the new turn's "ok" reply adds one more). No assistant
+ // message mixes tool-call and non-tool-call parts.
const assistantMsgs = messages.filter((m) => m.role === "assistant");
- expect(assistantMsgs).toHaveLength(1);
- const content = assistantMsgs[0]?.content as Array<Record<string, unknown>>;
- // Both tool-call and text parts should be in the same message
- expect(content.some((p) => p.type === "tool-call")).toBe(true);
- expect(content.some((p) => p.type === "text")).toBe(true);
+ for (const m of assistantMsgs) {
+ const c = m.content as Array<Record<string, unknown>>;
+ if (!Array.isArray(c)) continue;
+ expect(c.some((p) => p.type === "tool-call") && c.some((p) => p.type !== "tool-call")).toBe(
+ false,
+ );
+ }
+ expect(messages.some((m) => m.role === "tool")).toBe(true);
+ const toolCallMsg = assistantMsgs.find((m) => {
+ const c = m.content as Array<Record<string, unknown>>;
+ return Array.isArray(c) && c.some((p) => p.type === "tool-call");
+ });
+ expect(toolCallMsg).toBeDefined();
});
it("empty-text-part filter (Anthropic): empty text chunk is not sent", async () => {
@@ -1250,6 +1259,74 @@ describe("Agent", () => {
expect(execCount).toBe(2);
});
+ // ─── Cache stability: per-step wire prefix is immutable ─────────────────────
+
+ it("keeps earlier steps' wire messages byte-identical across requests (cache prefix is stable)", async () => {
+ // A 3-step tool turn. The messages for steps 0 and 1 must serialize
+ // identically in the step-2 request and the step-3 request — that
+ // byte-stability is what lets Anthropic's rolling prompt cache extend
+ // instead of re-writing the whole prefix every step (cache-miss-report.md).
+ // Uses the openai-compatible provider so no cacheControl markers (which
+ // intentionally move each step) obscure the content comparison.
+ let n = 0;
+ // mock.calls accumulates across tests in this file — reset so our
+ // `calls.length` assertions count only this run's requests.
+ vi.mocked(streamText).mockClear();
+ const toolDef = {
+ name: "read_file",
+ description: "reads a file",
+ parameters: z.object({ path: z.string() }),
+ execute: async (args: Record<string, unknown>) => `contents of ${String(args.path)}`,
+ };
+ const toolStep = (id: string, path: string) =>
+ makeMockStreamResult([
+ { type: "reasoning-delta", id: `r${id}`, text: `thinking ${id}` },
+ { type: "text-delta", id: `t${id}`, text: `step ${id}` },
+ { type: "tool-call", toolCallId: id, toolName: "read_file", input: { path } },
+ finishToolCalls,
+ ]);
+ vi.mocked(streamText).mockImplementation(() => {
+ n++;
+ if (n === 1) return toolStep("s0", "a.txt");
+ if (n === 2) return toolStep("s1", "b.txt");
+ if (n === 3) return toolStep("s2", "c.txt");
+ return makeMockStreamResult([{ type: "text-delta", id: "tf", text: "done" }, finishStop]);
+ });
+
+ const agent = new Agent(makeConfig({ tools: [toolDef] }));
+ for await (const _ of agent.run("go")) {
+ /* consume */
+ }
+
+ // 4 streamText calls (steps 0..3). Compare the step-2 request (call idx 2)
+ // and step-3 request (call idx 3).
+ const calls = vi.mocked(streamText).mock.calls;
+ expect(calls.length).toBe(4);
+ const req2 = calls[2]?.[0]?.messages as unknown[];
+ const req3 = calls[3]?.[0]?.messages as unknown[];
+
+ // Step-2 request = [system, user, a(s0), tool(s0), a(s1), tool(s1)] (6).
+ // Step-3 request appends a(s2), tool(s2). The shared 6-message prefix
+ // must be byte-identical.
+ expect(req2).toHaveLength(6);
+ expect(req3).toHaveLength(8);
+ expect(JSON.stringify(req3.slice(0, 6))).toBe(JSON.stringify(req2));
+
+ // And each step really is its own [assistant, tool] pair (not one merged
+ // assistant message with all tool calls bunched together).
+ const roles = (req3 as Array<{ role: string }>).map((m) => m.role);
+ expect(roles).toEqual([
+ "system",
+ "user",
+ "assistant",
+ "tool",
+ "assistant",
+ "tool",
+ "assistant",
+ "tool",
+ ]);
+ });
+
// ─── Usage / cache-rate telemetry ──────────────────────────────────────────
it("emits a usage event from the finish-step part with the cache read/write split", async () => {
diff --git a/packages/core/tests/db/chunks.test.ts b/packages/core/tests/db/chunks.test.ts
new file mode 100644
index 0000000..fe54628
--- /dev/null
+++ b/packages/core/tests/db/chunks.test.ts
@@ -0,0 +1,179 @@
+import { describe, expect, it } from "vitest";
+import { explodeTurn, explodeUserText, groupRowsToMessages } from "../../src/chunks/transform.js";
+import type { Chunk, ChunkRow, ChunkRowDraft } from "../../src/types/index.js";
+
+// These tests cover the pure explode/group transforms — the heart of the flat
+// chunk-log storage model. No DB is required.
+
+/** Promote drafts to rows with synthetic seq/id/createdAt (as appendChunks would). */
+function toRows(drafts: ChunkRowDraft[], tabId = "tab-1", startSeq = 0): ChunkRow[] {
+ return drafts.map((d, i) => ({
+ id: `c${i}`,
+ tabId,
+ seq: startSeq + i,
+ turnId: d.turnId,
+ step: d.step,
+ role: d.role,
+ type: d.type,
+ data: d.data,
+ createdAt: 1000 + i,
+ }));
+}
+
+describe("explodeTurn", () => {
+ it("splits a tool-batch into separate tool_call (assistant) and tool_result (tool) rows", () => {
+ const chunks: Chunk[] = [
+ { type: "thinking", text: "hmm", metadata: { anthropic: { signature: "S" } } },
+ { type: "text", text: "let me read" },
+ {
+ type: "tool-batch",
+ calls: [
+ { id: "a1", name: "read_file", arguments: { path: "x" }, result: "X", isError: false },
+ { id: "a2", name: "read_file", arguments: { path: "y" }, result: "Y", isError: false },
+ ],
+ },
+ ];
+ const drafts = explodeTurn("turn-1", chunks);
+
+ // thinking, text, tool_call×2 (assistant), tool_result×2 (tool)
+ expect(drafts.map((d) => `${d.role}/${d.type}`)).toEqual([
+ "assistant/thinking",
+ "assistant/text",
+ "assistant/tool_call",
+ "assistant/tool_call",
+ "tool/tool_result",
+ "tool/tool_result",
+ ]);
+ // All in the same step (one round-trip).
+ expect(drafts.every((d) => d.step === 0)).toBe(true);
+ expect(drafts.every((d) => d.turnId === "turn-1")).toBe(true);
+ });
+
+ it("increments step after each tool-batch (multi-step turn)", () => {
+ const chunks: Chunk[] = [
+ { type: "text", text: "s0" },
+ { type: "tool-batch", calls: [{ id: "a", name: "t", arguments: {}, result: "r" }] },
+ { type: "text", text: "s1" },
+ { type: "tool-batch", calls: [{ id: "b", name: "t", arguments: {}, result: "r" }] },
+ { type: "text", text: "final" },
+ ];
+ const drafts = explodeTurn("turn-1", chunks);
+ const byStep = (s: number) => drafts.filter((d) => d.step === s).map((d) => d.type);
+ expect(byStep(0)).toEqual(["text", "tool_call", "tool_result"]);
+ expect(byStep(1)).toEqual(["text", "tool_call", "tool_result"]);
+ expect(byStep(2)).toEqual(["text"]); // trailing final-step text, no tool-batch
+ });
+
+ it("omits tool_result rows for calls without a result", () => {
+ const chunks: Chunk[] = [
+ { type: "tool-batch", calls: [{ id: "a", name: "t", arguments: {} }] },
+ ];
+ const drafts = explodeTurn("turn-1", chunks);
+ expect(drafts.map((d) => d.type)).toEqual(["tool_call"]);
+ });
+});
+
+describe("groupRowsToMessages (round-trip)", () => {
+ it("reconstructs a user message then an assistant message with a per-step tool-batch", () => {
+ const rows = [
+ ...toRows(explodeUserText("turn-1", "hello"), "tab-1", 0),
+ ...toRows(
+ explodeTurn("turn-1", [
+ { type: "text", text: "reading" },
+ {
+ type: "tool-batch",
+ calls: [
+ {
+ id: "a1",
+ name: "read_file",
+ arguments: { path: "x" },
+ result: "X",
+ isError: false,
+ },
+ ],
+ },
+ { type: "text", text: "done" },
+ ]),
+ "tab-1",
+ 1,
+ ),
+ ];
+
+ const msgs = groupRowsToMessages(rows);
+ expect(msgs.map((m) => m.role)).toEqual(["user", "assistant"]);
+ expect(msgs[0]?.chunks).toEqual([{ type: "text", text: "hello" }]);
+
+ const a = msgs[1];
+ if (!a) throw new Error("no assistant message");
+ // reconstructed: text, tool-batch(step0), text(step1)
+ expect(a.chunks.map((c) => c.type)).toEqual(["text", "tool-batch", "text"]);
+ const batch = a.chunks.find((c) => c.type === "tool-batch");
+ if (batch?.type !== "tool-batch") throw new Error("no batch");
+ expect(batch.calls[0]).toMatchObject({
+ id: "a1",
+ name: "read_file",
+ arguments: { path: "x" },
+ result: "X",
+ isError: false,
+ });
+ });
+
+ it("keeps each step's tool calls in its own tool-batch chunk", () => {
+ const rows = toRows(
+ explodeTurn("turn-1", [
+ { type: "tool-batch", calls: [{ id: "a", name: "t", arguments: {}, result: "ra" }] },
+ { type: "tool-batch", calls: [{ id: "b", name: "t", arguments: {}, result: "rb" }] },
+ ]),
+ );
+ const msgs = groupRowsToMessages(rows);
+ expect(msgs).toHaveLength(1);
+ const batches = msgs[0]?.chunks.filter((c) => c.type === "tool-batch") ?? [];
+ expect(batches).toHaveLength(2);
+ });
+
+ it("round-trips a multi-step assistant turn back to its original chunk shape", () => {
+ const original: Chunk[] = [
+ { type: "thinking", text: "plan", metadata: { anthropic: { signature: "S" } } },
+ { type: "text", text: "step0" },
+ {
+ type: "tool-batch",
+ calls: [
+ { id: "a", name: "read_file", arguments: { path: "p" }, result: "R", isError: false },
+ ],
+ },
+ { type: "text", text: "final" },
+ ];
+ const rows = toRows(explodeTurn("turn-1", original));
+ const msgs = groupRowsToMessages(rows);
+ expect(msgs).toHaveLength(1);
+ expect(msgs[0]?.chunks).toEqual(original);
+ });
+
+ it("tolerates an orphan tool_result whose tool_call was paged out", () => {
+ const rows = toRows([
+ {
+ turnId: "turn-1",
+ step: 0,
+ role: "tool",
+ type: "tool_result",
+ data: { callId: "z", name: "t", result: "R", isError: false },
+ },
+ ]);
+ const msgs = groupRowsToMessages(rows);
+ expect(msgs).toHaveLength(1);
+ const batch = msgs[0]?.chunks[0];
+ if (batch?.type !== "tool-batch") throw new Error("no batch");
+ expect(batch.calls[0]).toMatchObject({ id: "z", result: "R" });
+ });
+
+ it("breaks the assistant grouping on a user or system row", () => {
+ const rows = [
+ ...toRows(explodeUserText("t1", "q1"), "tab", 0),
+ ...toRows(explodeTurn("t1", [{ type: "text", text: "a1" }]), "tab", 1),
+ ...toRows(explodeUserText("t2", "q2"), "tab", 2),
+ ...toRows(explodeTurn("t2", [{ type: "system", kind: "notice", text: "n" }]), "tab", 3),
+ ];
+ const msgs = groupRowsToMessages(rows);
+ expect(msgs.map((m) => m.role)).toEqual(["user", "assistant", "user", "system"]);
+ });
+});
diff --git a/packages/frontend/src/lib/tabs.svelte.ts b/packages/frontend/src/lib/tabs.svelte.ts
index 6e2d157..c57f800 100644
--- a/packages/frontend/src/lib/tabs.svelte.ts
+++ b/packages/frontend/src/lib/tabs.svelte.ts
@@ -230,12 +230,10 @@ export function createTabStore() {
};
const messagesRes = await fetch(`${config.apiBase}/tabs/${agentId}/messages?limit=100`);
- // The backend's `getMessagesForTab` (packages/core/src/db/messages.ts)
- // already parses `content_json` into a `Chunk[]` and serves it as
- // `chunks` over the wire — NOT the raw `contentJson` string. Earlier
- // versions of this client expected `contentJson` and silently dropped
- // every message when JSON.parse(undefined) threw, leaving the UI
- // with empty conversations after a refresh.
+ // `GET /messages` windows the flat chunk log (last N chunks) and
+ // groups the rows into render messages (`groupRowsToMessages` in
+ // packages/core/src/chunks/transform.ts), serving them as `chunks`
+ // per message over the wire — NOT a raw JSON string.
const messagesData = messagesRes.ok
? ((await messagesRes.json()) as {
messages: Array<{
@@ -408,8 +406,15 @@ export function createTabStore() {
const res = await fetch(`${config.apiBase}/tabs/${tabId}/messages?limit=50${beforeParam}`);
if (!res.ok) return;
const data = (await res.json()) as {
- messages?: Array<{ id?: string; role: string; chunks?: Chunk[]; seq?: number }>;
+ messages?: Array<{
+ id?: string;
+ role: string;
+ chunks?: Chunk[];
+ seq?: number;
+ turnId?: string;
+ }>;
total?: number;
+ oldestSeq?: number | null;
};
const rawMessages = data.messages ?? [];
if (rawMessages.length === 0) {
@@ -426,18 +431,43 @@ export function createTabStore() {
chunks: Array.isArray(m.chunks) ? m.chunks : [],
isStreaming: false,
seq: m.seq,
+ ...(m.turnId !== undefined ? { turnId: m.turnId } : {}),
}));
const current = getTabById(tabId);
if (!current) return;
+ // Chunk-granular pagination can split ONE turn across the window
+ // boundary: the oldest message already loaded and the newest message
+ // in this older page may share a turn_id. Merge them (older chunks
+ // first) so the turn renders as one bubble instead of duplicating.
+ const merged = [...current.messages];
+ const lastOlder = older[older.length - 1];
+ const firstCurrent = merged[0];
+ if (
+ lastOlder &&
+ firstCurrent &&
+ lastOlder.turnId !== undefined &&
+ lastOlder.turnId === firstCurrent.turnId &&
+ lastOlder.role === firstCurrent.role
+ ) {
+ older.pop();
+ merged[0] = {
+ ...firstCurrent,
+ id: lastOlder.id,
+ seq: lastOlder.seq,
+ turnId: lastOlder.turnId,
+ chunks: [...lastOlder.chunks, ...firstCurrent.chunks],
+ };
+ }
+
// Avoid duplicating messages we already have loaded.
- const existingIds = new Set(current.messages.map((m) => m.id));
+ const existingIds = new Set(merged.map((m) => m.id));
const toPrepend = older.filter((m) => !existingIds.has(m.id));
- const newOldestSeq = oldestSeqOf(rawMessages);
+ const newOldestSeq = data.oldestSeq ?? oldestSeqOf(rawMessages);
updateTab(tabId, {
- messages: [...toPrepend, ...current.messages],
+ messages: [...toPrepend, ...merged],
oldestLoadedSeq: newOldestSeq ?? current.oldestLoadedSeq,
totalMessages: data.total ?? current.totalMessages,
});
diff --git a/packages/frontend/src/lib/types.ts b/packages/frontend/src/lib/types.ts
index 8c34d69..6e87aec 100644
--- a/packages/frontend/src/lib/types.ts
+++ b/packages/frontend/src/lib/types.ts
@@ -96,6 +96,13 @@ export interface ChatMessage {
isStreaming?: boolean;
debugInfo?: DebugInfo;
seq?: number;
+ /**
+ * turn_id of the chunk rows this message was grouped from (history loaded
+ * from the backend). Used by `loadMoreMessages` to merge a turn that was
+ * split across the chunk-pagination window boundary. Absent for live
+ * (streaming) messages built client-side.
+ */
+ turnId?: string;
}
export type ConnectionStatus = "connecting" | "connected" | "disconnected";