summaryrefslogtreecommitdiffhomepage
path: root/packages/api
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-05-30 20:06:31 +0900
committerAdam Malczewski <[email protected]>2026-05-30 20:06:31 +0900
commit0f39b6f78957aacf206012ad2193d9b0c1940c1f (patch)
treeff5f2da8b4f3cdf56cf50d44b8fec75a489ad6fe /packages/api
parent8c58a973b0d021689cebad5c0cc6d56956bbc2f6 (diff)
downloaddispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.tar.gz
dispatch-0f39b6f78957aacf206012ad2193d9b0c1940c1f.zip
refactor(chunks): append-only chunk log with per-step cache-stable wire
Replace the message-as-container model with a flat, append-only chunk log. - chunks table (id, tab_id, seq, turn_id, step, role, type, data_json): one row per chunk; tool_call (assistant) and tool_result (tool) are SEPARATE rows linked by callId. Message/turn are derived groupings, not stored. - chunks/transform.ts: DB-free explode (Chunk[] -> rows) / group (rows -> messages), shared by backend and the browser frontend. - Cache fix: toModelMessages segments each turn at tool-batch boundaries into stable [assistant, tool] pairs per step, so earlier steps serialize byte-identically across requests (kills the prompt-cache churn). - agent-manager persists a turn's chunks on seal (once), discarding a failed fallback attempt's partial chunks; rebuilds agent history from the log. - GET /messages windows the log by chunk seq then groups; loadMoreMessages merges a turn split across the window boundary by turnId. - One-shot migration drops the legacy messages table and clears tabs; settings/credentials/keys/usage preserved. Full suite green (317 tests); biome, tsc, and svelte-check clean.
Diffstat (limited to 'packages/api')
-rw-r--r--packages/api/src/agent-manager.ts241
-rw-r--r--packages/api/src/routes/tabs.ts20
-rw-r--r--packages/api/tests/agent-manager.test.ts11
-rw-r--r--packages/api/tests/routes.test.ts20
4 files changed, 146 insertions, 146 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index c873388..f7975d1 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -3,9 +3,8 @@ import {
type AgentEvent,
type AgentSkillMapping,
type AgentStatus,
+ appendChunks,
appendEventToChunks,
- appendMessage,
- applySystemEvent,
BackgroundShellStore,
BackgroundTranscriptStore,
type Chunk,
@@ -26,6 +25,8 @@ import {
createYoutubeTranscribeTool,
type DispatchConfig,
expandAgentToolNames,
+ explodeTurn,
+ explodeUserText,
GLOBAL_AGENTS_DIR,
getAgentDirPaths,
getClaudeAccountsFromDB,
@@ -45,7 +46,6 @@ import {
type TabStatusSnapshot,
TaskList,
toAvailableAgents,
- updateMessage,
validateConfig,
} from "@dispatch/core";
import type { PermissionManager } from "./permission-manager.js";
@@ -179,13 +179,23 @@ interface TabAgent {
/** Store for transcript requests backgrounded due to user interrupt. */
transcriptStore: BackgroundTranscriptStore;
/**
- * In-flight assistant message chunks for the active turn. `null` when
- * no turn is running. Out-of-band system events (config-reload,
- * cancel, etc.) target this list when present.
+ * In-flight assistant chunks for the active turn. `null` when no turn is
+ * running. Out-of-band system events (config-reload, cancel, etc.) push
+ * onto this list when present; it is exploded into chunk rows when the
+ * turn flushes.
*/
currentChunks: Chunk[] | null;
- /** DB id of the in-flight assistant message (if persisted yet). */
+ /**
+ * Opaque id of the in-flight assistant turn, used as the `currentAssistantId`
+ * in the WS status snapshot so a reconnecting frontend can align its local
+ * streaming message. (No longer a DB row id — the turn is many chunk rows.)
+ */
currentAssistantId: string | null;
+ /**
+ * `turn_id` shared by the current turn's user message and assistant chunk
+ * rows. Set at the start of `processMessage`, cleared when the turn ends.
+ */
+ currentTurnId: string | null;
}
export class AgentManager {
@@ -331,6 +341,7 @@ export class AgentManager {
transcriptStore: new BackgroundTranscriptStore(),
currentChunks: null,
currentAssistantId: null,
+ currentTurnId: null,
};
this.tabAgents.set(tabId, tabAgent);
}
@@ -716,17 +727,14 @@ export class AgentManager {
// 3. Config or skills reload (configWatcher / skillsWatcher
// also null out `tabAgent.agent`).
//
- // Boundary semantics: `processMessage` calls `appendMessage`
- // for the current turn's user message BEFORE calling this
- // function, so the DB ends in `[..., u_current]`. In the
- // fallback retry path (agent-mode automatic model fallback),
- // the previous attempt may also have flushed a partial
- // assistant response, so the DB ends in
- // `[..., u_current, partial_a]`. Either way, we walk
- // backwards to the most recent user-role row and load only
- // strictly-prior rows: `agent.run()` will push the current
- // user message itself at agent.ts:546, so including it here
- // would duplicate it.
+ // Boundary semantics: `processMessage` appends the current turn's
+ // user message (as a chunk row) BEFORE calling this function, so the
+ // grouped history ends in `[..., u_current]`. In the fallback retry
+ // path the previous attempt may also have flushed a partial assistant
+ // turn, so it can end `[..., u_current, partial_a]`. Either way, we
+ // walk backwards to the most recent user-role message and load only
+ // strictly-prior messages: `agent.run()` pushes the current user
+ // message itself, so including it here would duplicate it.
//
// `toModelMessages` already filters out `role === "system"`
// rows and strips `error` / `system` chunks, so it's safe to
@@ -824,54 +832,31 @@ export class AgentManager {
}
/**
- * Persist a system chunk to a tab's message history.
+ * Persist a system chunk (notice / model-changed / config-reload /
+ * cancelled) to a tab's history.
*
* If an assistant turn is in flight (`currentChunks` is non-null), the
- * chunk is appended to the in-flight assistant message's chunk list —
- * the final `appendMessage` / `updateMessage` call at end-of-turn picks
- * it up automatically.
+ * chunk is folded into the in-flight chunk list; it is exploded into a
+ * `system` chunk row when the turn flushes.
*
- * Otherwise we load the tab's persisted messages, run them through
- * `applySystemEvent` (which either appends to an existing trailing
- * `role: "system"` message or creates a new one), then persist the
- * delta via `appendMessage` / `updateMessage`.
+ * Otherwise we append a standalone `system` chunk row immediately. Adjacent
+ * system rows are coalesced back into one system message at group time
+ * (`groupRowsToMessages`).
*/
private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void {
const tabAgent = this.tabAgents.get(tabId);
- // Turn in flight → append directly to the in-flight chunk list.
- // The chunk lands on the assistant message when it's persisted at
- // turn-end (or the assistant message is updated mid-turn elsewhere).
+ // Turn in flight → fold into the in-flight chunk list; it is exploded
+ // into chunk rows (including this system chunk) when the turn flushes.
if (tabAgent?.currentChunks) {
tabAgent.currentChunks.push({ type: "system", kind, text });
- if (tabAgent.currentAssistantId) {
- try {
- updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks));
- } catch {
- // Best-effort — the final persistence in processMessage will
- // flush the same chunks again.
- }
- }
return;
}
- // No turn in flight → route via applySystemEvent against the
- // persisted message list. Either appends to a trailing system
- // message or creates a fresh one.
+ // No turn in flight → persist a standalone system chunk row immediately.
try {
- const rows = getMessagesForTab(tabId);
- const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks }));
- const before = messages[messages.length - 1];
- const { messageId } = applySystemEvent(messages, { kind, text });
- const target = messages.find((m) => m.id === messageId);
- if (!target) return;
- if (before && before.id === messageId) {
- // Appended to existing trailing system message.
- updateMessage(messageId, JSON.stringify(target.chunks));
- } else {
- // Newly created system message.
- appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks));
- }
+ const turnId = tabAgent?.currentTurnId ?? crypto.randomUUID();
+ appendChunks(tabId, explodeTurn(turnId, [{ type: "system", kind, text }]));
} catch {
// DB not available (e.g. tab not yet created) — drop silently.
}
@@ -880,22 +865,17 @@ export class AgentManager {
stopTab(tabId: string): void {
const tabAgent = this.tabAgents.get(tabId);
if (tabAgent) {
- // If a turn is in flight, drop a `cancelled` system chunk into
- // the in-flight assistant message so the user sees an explicit
- // "Generation cancelled by user" marker at the cancellation point.
+ // If a turn is in flight, drop a `cancelled` system chunk into the
+ // in-flight chunk list so the user sees an explicit "Generation
+ // cancelled by user" marker at the cancellation point. It is
+ // persisted (as a chunk row) when `processMessage` flushes the
+ // aborted turn.
if (tabAgent.currentChunks) {
tabAgent.currentChunks.push({
type: "system",
kind: "cancelled",
text: "Generation cancelled by user",
});
- if (tabAgent.currentAssistantId) {
- try {
- updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks));
- } catch {
- // best-effort
- }
- }
}
tabAgent.abortController?.abort();
tabAgent.status = "idle";
@@ -1134,13 +1114,12 @@ export class AgentManager {
tabAgent.status = "running";
this.messageCount += 1;
- // Persist user message to DB (once, before any fallback retry)
- appendMessage(
- tabId,
- crypto.randomUUID(),
- "user",
- JSON.stringify([{ type: "text", text: message }]),
- );
+ // Persist the user message as a chunk row (once, before any fallback
+ // retry). The whole turn — this user message plus the assistant's
+ // chunk rows — shares one `turn_id`.
+ const turnId = crypto.randomUUID();
+ tabAgent.currentTurnId = turnId;
+ appendChunks(tabId, explodeUserText(turnId, message));
// Store agent models on the tab if provided (defines fallback order)
if (agentModels) {
@@ -1166,25 +1145,23 @@ export class AgentManager {
currentModelId = entry.model_id || undefined;
allOutput = "";
- // Single ordered chunk list for the assistant turn — replaces the
- // previous (text + toolCalls + thinking) tri-accumulator pattern.
- // Persisted progressively (insert on first chunk, update thereafter)
- // so out-of-band routes (config-reload, cancel) see real DB rows.
+ // Single ordered chunk list accumulating this attempt's assistant
+ // turn (text / thinking / tool-batch / error / system), folded from
+ // the stream via the shared `appendEventToChunks` helper.
const chunks: Chunk[] = [];
const assistantId = crypto.randomUUID();
let assistantPersisted = false;
tabAgent.currentChunks = chunks;
tabAgent.currentAssistantId = assistantId;
+ // Write-on-seal: explode the accumulated turn into flat chunk rows
+ // ONCE, when the turn settles. `explodeTurn` splits each step's
+ // `tool-batch` into separate `tool_call` + `tool_result` rows and
+ // tags every row with `turn_id` + derived `step`.
const flushAssistant = (): void => {
- if (chunks.length === 0) return;
- const json = JSON.stringify(chunks);
- if (!assistantPersisted) {
- appendMessage(tabId, assistantId, "assistant", json);
- assistantPersisted = true;
- } else {
- updateMessage(assistantId, json);
- }
+ if (assistantPersisted || chunks.length === 0) return;
+ appendChunks(tabId, explodeTurn(turnId, chunks));
+ assistantPersisted = true;
};
let attemptError: string | null = null;
@@ -1205,7 +1182,7 @@ export class AgentManager {
});
}
} catch {
- // Best-effort — if this fails, appendMessage will throw and we'll catch it below
+ // Best-effort — if this fails, chunk persistence will throw and we'll catch it below
}
for await (const event of agent.run(message, {
@@ -1237,23 +1214,12 @@ export class AgentManager {
allOutput += event.delta;
}
- if (event.type === "done") {
- // End of turn — flush the accumulated chunks. Reset the
- // in-flight pointers so out-of-band system events route
- // through `applySystemEvent` against the persisted list
- // instead of mutating a stale array.
- flushAssistant();
- chunks.length = 0;
- assistantPersisted = true; // suppress post-loop flush
- tabAgent.currentChunks = null;
- tabAgent.currentAssistantId = null;
- continue;
- }
-
// Route every content-bearing event through the shared helper.
// `appendEventToChunks` ignores lifecycle events (status / done
// / task-list-update / tab-created / message-* / etc), so it's
- // safe to call unconditionally.
+ // safe to call unconditionally. Persistence happens once, after
+ // the loop, so we never write a partial turn that a fallback
+ // retry would then duplicate.
appendEventToChunks(chunks, event);
}
} catch (err) {
@@ -1261,9 +1227,24 @@ export class AgentManager {
attemptError = err instanceof Error ? err.message : String(err);
}
- // Flush any accumulated assistant content from this attempt (covers
- // the abort/error/exception paths where we never saw a `done`).
- flushAssistant();
+ // Decide whether a fallback retry will supersede this attempt.
+ const isRetryable =
+ attemptError !== null &&
+ (attemptError.includes("status=429") ||
+ attemptError.toLowerCase().includes("rate limit") ||
+ attemptError.toLowerCase().includes("rate_limit") ||
+ attemptError.toLowerCase().includes("usage limit") ||
+ attemptError.toLowerCase().includes("exhausted"));
+ const nextEntry = fallbackSequence[fallbackIdx + 1];
+ const willRetry = Boolean(isRetryable && this.modelRegistry && tabAgent.keyId && nextEntry);
+
+ // Persist this attempt's turn — unless a retry will replace it, in
+ // which case the partial (and its error chunk) is discarded so the
+ // next attempt's chunks don't merge with a failed one. On success,
+ // abort, or a final error, the turn is flushed exactly once.
+ if (!willRetry) {
+ flushAssistant();
+ }
tabAgent.currentChunks = null;
tabAgent.currentAssistantId = null;
@@ -1273,43 +1254,27 @@ export class AgentManager {
break;
}
- // Check if error is retryable (rate limit / exhausted key)
- const isRetryable =
- attemptError.includes("status=429") ||
- attemptError.toLowerCase().includes("rate limit") ||
- attemptError.toLowerCase().includes("rate_limit") ||
- attemptError.toLowerCase().includes("usage limit") ||
- attemptError.toLowerCase().includes("exhausted");
-
- if (isRetryable && this.modelRegistry && tabAgent.keyId) {
- this.modelRegistry.markKeyExhausted(tabAgent.keyId, attemptError);
-
- // Try the next entry in the agent's fallback sequence
- const nextIdx = fallbackIdx + 1;
- const nextEntry = fallbackSequence[nextIdx];
- if (nextIdx < maxFallbackAttempts && nextEntry) {
- const fallbackMsg =
- `Key "${tabAgent.keyId}" rate limited. ` +
- `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`;
- console.warn(`[dispatch] ${fallbackMsg}`);
- // Persist the notice + model-change as system chunks. We're
- // between turns here (just flushed the previous assistant
- // message), so the helper routes them into a `role: "system"`
- // message via `applySystemEvent`.
- this.emit({ type: "notice", message: fallbackMsg }, tabId);
- this.routeSystemEventToTab(tabId, "notice", fallbackMsg);
- this.emit(
- { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id },
- tabId,
- );
- this.routeSystemEventToTab(
- tabId,
- "model-changed",
- `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`,
- );
- tabAgent.agent = null;
- continue;
- }
+ if (willRetry && nextEntry && tabAgent.keyId) {
+ this.modelRegistry?.markKeyExhausted(tabAgent.keyId, attemptError);
+ const fallbackMsg =
+ `Key "${tabAgent.keyId}" rate limited. ` +
+ `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`;
+ console.warn(`[dispatch] ${fallbackMsg}`);
+ // Persist the notice + model-change as standalone system chunk
+ // rows (no turn in flight now — currentChunks was just cleared).
+ this.emit({ type: "notice", message: fallbackMsg }, tabId);
+ this.routeSystemEventToTab(tabId, "notice", fallbackMsg);
+ this.emit(
+ { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id },
+ tabId,
+ );
+ this.routeSystemEventToTab(
+ tabId,
+ "model-changed",
+ `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`,
+ );
+ tabAgent.agent = null;
+ continue;
}
// All fallbacks exhausted or non-retryable error
@@ -1319,6 +1284,8 @@ export class AgentManager {
this.emit({ type: "status", status: "error" }, tabId);
break;
}
+ // Turn fully settled — clear the shared turn id.
+ tabAgent.currentTurnId = null;
// Resolve completion promise for child agents
if (processError === null) {
diff --git a/packages/api/src/routes/tabs.ts b/packages/api/src/routes/tabs.ts
index afa5735..e9265ec 100644
--- a/packages/api/src/routes/tabs.ts
+++ b/packages/api/src/routes/tabs.ts
@@ -2,10 +2,11 @@ import {
archiveTab,
createTab,
deleteSetting,
- getMessagesForTab,
+ getChunksForTab,
getSetting,
getTab,
- getTotalMessageCount,
+ getTotalChunkCount,
+ groupRowsToMessages,
listOpenTabs,
setSetting,
updateTabModel,
@@ -65,6 +66,11 @@ tabsRoutes.get("/:id", (c) => {
return c.json(tab);
});
+// Conversation history for a tab, paginated at CHUNK granularity. The flat
+// chunk log is windowed by `limit`/`before` (both chunk-`seq` cursors) so a
+// single huge turn never dumps in full, then grouped into render messages.
+// `before` is the oldest chunk seq the client already holds. This is what
+// powers per-chunk frontend pagination / memory control.
tabsRoutes.get("/:id/messages", (c) => {
const id = c.req.param("id");
const limitRaw = c.req.query("limit");
@@ -78,9 +84,13 @@ tabsRoutes.get("/:id/messages", (c) => {
...(before !== undefined && Number.isFinite(before) ? { before } : {}),
}
: undefined;
- const messages = getMessagesForTab(id, options);
- const total = getTotalMessageCount(id);
- return c.json({ messages, total });
+ const chunks = getChunksForTab(id, options);
+ const messages = groupRowsToMessages(chunks);
+ // `oldestSeq` is the chunk-seq cursor the client pages backward from; null
+ // when the window is empty.
+ const oldestSeq = chunks.length > 0 ? (chunks[0]?.seq ?? null) : null;
+ const total = getTotalChunkCount(id);
+ return c.json({ messages, total, oldestSeq });
});
tabsRoutes.patch("/:id", async (c) => {
diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts
index ba14cad..6b016db 100644
--- a/packages/api/tests/agent-manager.test.ts
+++ b/packages/api/tests/agent-manager.test.ts
@@ -259,8 +259,15 @@ vi.mock("@dispatch/core", () => ({
getSetting(_key: string) {
return null;
},
- appendMessage() {},
- updateMessage() {},
+ appendChunks() {
+ return [];
+ },
+ explodeUserText() {
+ return [];
+ },
+ explodeTurn() {
+ return [];
+ },
getMessagesForTab(tabId: string) {
return fakeMessagesByTab.get(tabId) ?? [];
},
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index eba2226..f4de845 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -181,11 +181,27 @@ vi.mock("@dispatch/core", () => ({
getSetting(_key: string) {
return null;
},
- appendMessage() {},
- updateMessage() {},
+ appendChunks() {
+ return [];
+ },
+ explodeUserText() {
+ return [];
+ },
+ explodeTurn() {
+ return [];
+ },
getMessagesForTab() {
return [];
},
+ getChunksForTab() {
+ return [];
+ },
+ groupRowsToMessages() {
+ return [];
+ },
+ getTotalChunkCount() {
+ return 0;
+ },
appendEventToChunks(_chunks: unknown[], _event: unknown) {
// no-op stub
},