diff options
| author | Adam Malczewski <[email protected]> | 2026-05-27 18:35:18 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-05-27 18:35:18 +0900 |
| commit | ca6ee91c5e1167b1929eedbb96c76dfa24e7d026 (patch) | |
| tree | bc23acac2e7caaf2e59eacbc21bfc9b41f3c1458 /packages/api | |
| parent | da57842686ebfd157396551fc76d0c18f7676335 (diff) | |
| download | dispatch-ca6ee91c5e1167b1929eedbb96c76dfa24e7d026.tar.gz dispatch-ca6ee91c5e1167b1929eedbb96c76dfa24e7d026.zip | |
refactor: ChatMessage.chunks[] union — interleaved thinking, tool batching, error/system chunks
Diffstat (limited to 'packages/api')
| -rw-r--r-- | packages/api/src/agent-manager.ts | 221 | ||||
| -rw-r--r-- | packages/api/tests/agent-manager.test.ts | 15 | ||||
| -rw-r--r-- | packages/api/tests/routes.test.ts | 15 |
3 files changed, 184 insertions, 67 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index 487b09f..0f06683 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -3,9 +3,12 @@ import { type AgentEvent, type AgentSkillMapping, type AgentStatus, + appendEventToChunks, appendMessage, + applySystemEvent, BackgroundShellStore, BackgroundTranscriptStore, + type Chunk, type ClaudeAccount, clearSpillForTab, configToRuleset, @@ -23,6 +26,7 @@ import { createYoutubeTranscribeTool, type DispatchConfig, getClaudeAccountsFromDB, + getMessagesForTab, getSetting, loadConfig, loadSkills, @@ -32,7 +36,9 @@ import { refreshAccountCredentialsAsync, resolveApiKey, type SkillDefinition, + type SystemChunkKind, TaskList, + updateMessage, validateConfig, } from "@dispatch/core"; import type { PermissionManager } from "./permission-manager.js"; @@ -160,6 +166,14 @@ interface TabAgent { shellStore: BackgroundShellStore; /** Store for transcript requests backgrounded due to user interrupt. */ transcriptStore: BackgroundTranscriptStore; + /** + * In-flight assistant message chunks for the active turn. `null` when + * no turn is running. Out-of-band system events (config-reload, + * cancel, etc.) target this list when present. + */ + currentChunks: Chunk[] | null; + /** DB id of the in-flight assistant message (if persisted yet). */ + currentAssistantId: string | null; } export class AgentManager { @@ -222,9 +236,10 @@ export class AgentManager { for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } - // Emit config-reload to all tabs + // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); + this.routeSystemEventToTab(tabId, "config-reload", "Configuration reloaded"); } }); @@ -234,9 +249,10 @@ export class AgentManager { for (const tabAgent of this.tabAgents.values()) { tabAgent.agent = null; } - // Emit config-reload to all tabs + // Emit config-reload to all tabs (and persist as a system chunk) for (const tabId of this.tabAgents.keys()) { this.emit({ type: "config-reload" }, tabId); + this.routeSystemEventToTab(tabId, "config-reload", "Skills reloaded"); } }); } @@ -297,6 +313,8 @@ export class AgentManager { queueListeners: [], shellStore: new BackgroundShellStore(), transcriptStore: new BackgroundTranscriptStore(), + currentChunks: null, + currentAssistantId: null, }; this.tabAgents.set(tabId, tabAgent); } @@ -671,9 +689,80 @@ export class AgentManager { } } + /** + * Persist a system chunk to a tab's message history. + * + * If an assistant turn is in flight (`currentChunks` is non-null), the + * chunk is appended to the in-flight assistant message's chunk list — + * the final `appendMessage` / `updateMessage` call at end-of-turn picks + * it up automatically. + * + * Otherwise we load the tab's persisted messages, run them through + * `applySystemEvent` (which either appends to an existing trailing + * `role: "system"` message or creates a new one), then persist the + * delta via `appendMessage` / `updateMessage`. + */ + private routeSystemEventToTab(tabId: string, kind: SystemChunkKind, text: string): void { + const tabAgent = this.tabAgents.get(tabId); + + // Turn in flight → append directly to the in-flight chunk list. + // The chunk lands on the assistant message when it's persisted at + // turn-end (or the assistant message is updated mid-turn elsewhere). + if (tabAgent?.currentChunks) { + tabAgent.currentChunks.push({ type: "system", kind, text }); + if (tabAgent.currentAssistantId) { + try { + updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); + } catch { + // Best-effort — the final persistence in processMessage will + // flush the same chunks again. + } + } + return; + } + + // No turn in flight → route via applySystemEvent against the + // persisted message list. Either appends to a trailing system + // message or creates a fresh one. + try { + const rows = getMessagesForTab(tabId); + const messages = rows.map((r) => ({ id: r.id, role: r.role, chunks: r.chunks })); + const before = messages[messages.length - 1]; + const { messageId } = applySystemEvent(messages, { kind, text }); + const target = messages.find((m) => m.id === messageId); + if (!target) return; + if (before && before.id === messageId) { + // Appended to existing trailing system message. + updateMessage(messageId, JSON.stringify(target.chunks)); + } else { + // Newly created system message. + appendMessage(tabId, messageId, "system", JSON.stringify(target.chunks)); + } + } catch { + // DB not available (e.g. tab not yet created) — drop silently. + } + } + stopTab(tabId: string): void { const tabAgent = this.tabAgents.get(tabId); if (tabAgent) { + // If a turn is in flight, drop a `cancelled` system chunk into + // the in-flight assistant message so the user sees an explicit + // "Generation cancelled by user" marker at the cancellation point. + if (tabAgent.currentChunks) { + tabAgent.currentChunks.push({ + type: "system", + kind: "cancelled", + text: "Generation cancelled by user", + }); + if (tabAgent.currentAssistantId) { + try { + updateMessage(tabAgent.currentAssistantId, JSON.stringify(tabAgent.currentChunks)); + } catch { + // best-effort + } + } + } tabAgent.abortController?.abort(); tabAgent.status = "idle"; tabAgent.agent = null; @@ -891,15 +980,28 @@ export class AgentManager { currentKeyId = entry.key_id || undefined; currentModelId = entry.model_id || undefined; allOutput = ""; - let assistantText = ""; - let assistantThinking = ""; - const assistantToolCalls: Array<{ - id: string; - name: string; - arguments: Record<string, unknown>; - result?: string; - isError?: boolean; - }> = []; + + // Single ordered chunk list for the assistant turn — replaces the + // previous (text + toolCalls + thinking) tri-accumulator pattern. + // Persisted progressively (insert on first chunk, update thereafter) + // so out-of-band routes (config-reload, cancel) see real DB rows. + const chunks: Chunk[] = []; + const assistantId = crypto.randomUUID(); + let assistantPersisted = false; + tabAgent.currentChunks = chunks; + tabAgent.currentAssistantId = assistantId; + + const flushAssistant = (): void => { + if (chunks.length === 0) return; + const json = JSON.stringify(chunks); + if (!assistantPersisted) { + appendMessage(tabId, assistantId, "assistant", json); + assistantPersisted = true; + } else { + updateMessage(assistantId, json); + } + }; + let attemptError: string | null = null; try { @@ -925,11 +1027,17 @@ export class AgentManager { message, reasoningEffort ? { reasoningEffort } : undefined, )) { - // Stop processing if the tab was aborted (closed/stopped) + // Stop processing if the tab was aborted (closed/stopped). + // stopTab() already injected a `cancelled` system chunk into + // `chunks` before flipping the abort flag, so we just need + // to flush and exit. if (tabAgent.abortController?.signal.aborted) break; if (event.type === "error") { attemptError = event.error; + // Record the error as a chunk so it's part of the + // persisted turn history. + appendEventToChunks(chunks, event); break; } @@ -938,68 +1046,41 @@ export class AgentManager { } this.emit(event, tabId); - // Accumulate content for DB persistence + // For diagnostics / child agent result harvesting, keep a + // flat string copy of plain text output. if (event.type === "text-delta") { - assistantText += event.delta; allOutput += event.delta; - } else if (event.type === "reasoning-delta") { - assistantThinking += event.delta; - } else if (event.type === "tool-call") { - assistantToolCalls.push({ - id: event.toolCall.id, - name: event.toolCall.name, - arguments: event.toolCall.arguments, - }); - } else if (event.type === "tool-result") { - const tc = assistantToolCalls.find((t) => t.id === event.toolResult.toolCallId); - if (tc) { - tc.result = event.toolResult.result; - tc.isError = event.toolResult.isError; - } - } else if (event.type === "done") { - // Persist assistant message to DB - const contentSegments: Array<Record<string, unknown>> = []; - if (assistantText) contentSegments.push({ type: "text", text: assistantText }); - for (const tc of assistantToolCalls) { - contentSegments.push({ type: "tool-call", ...tc }); - } - if (contentSegments.length > 0) { - appendMessage( - tabId, - crypto.randomUUID(), - "assistant", - JSON.stringify(contentSegments), - assistantThinking || undefined, - ); - } - // Reset for next turn - assistantText = ""; - assistantThinking = ""; - assistantToolCalls.length = 0; } + + if (event.type === "done") { + // End of turn — flush the accumulated chunks. Reset the + // in-flight pointers so out-of-band system events route + // through `applySystemEvent` against the persisted list + // instead of mutating a stale array. + flushAssistant(); + chunks.length = 0; + assistantPersisted = true; // suppress post-loop flush + tabAgent.currentChunks = null; + tabAgent.currentAssistantId = null; + continue; + } + + // Route every content-bearing event through the shared helper. + // `appendEventToChunks` ignores lifecycle events (status / done + // / task-list-update / tab-created / message-* / etc), so it's + // safe to call unconditionally. + appendEventToChunks(chunks, event); } } catch (err) { console.error(`[dispatch] processMessage error for tab ${tabId}:`, err); attemptError = err instanceof Error ? err.message : String(err); } - // Flush any accumulated assistant content from this attempt - if (assistantText || assistantToolCalls.length > 0) { - const contentSegments: Array<Record<string, unknown>> = []; - if (assistantText) contentSegments.push({ type: "text", text: assistantText }); - for (const tc of assistantToolCalls) { - contentSegments.push({ type: "tool-call", ...tc }); - } - if (contentSegments.length > 0) { - appendMessage( - tabId, - crypto.randomUUID(), - "assistant", - JSON.stringify(contentSegments), - assistantThinking || undefined, - ); - } - } + // Flush any accumulated assistant content from this attempt (covers + // the abort/error/exception paths where we never saw a `done`). + flushAssistant(); + tabAgent.currentChunks = null; + tabAgent.currentAssistantId = null; // No error — success if (!attemptError) { @@ -1024,11 +1105,21 @@ export class AgentManager { `Key "${tabAgent.keyId}" rate limited. ` + `Falling back to "${nextEntry.key_id}" (model: ${nextEntry.model_id})...`; console.warn(`[dispatch] ${fallbackMsg}`); + // Persist the notice + model-change as system chunks. We're + // between turns here (just flushed the previous assistant + // message), so the helper routes them into a `role: "system"` + // message via `applySystemEvent`. this.emit({ type: "notice", message: fallbackMsg }, tabId); + this.routeSystemEventToTab(tabId, "notice", fallbackMsg); this.emit( { type: "model-changed", keyId: nextEntry.key_id, modelId: nextEntry.model_id }, tabId, ); + this.routeSystemEventToTab( + tabId, + "model-changed", + `Switched to ${nextEntry.model_id} (${nextEntry.key_id})`, + ); tabAgent.agent = null; continue; } diff --git a/packages/api/tests/agent-manager.test.ts b/packages/api/tests/agent-manager.test.ts index a1f2d75..28fa044 100644 --- a/packages/api/tests/agent-manager.test.ts +++ b/packages/api/tests/agent-manager.test.ts @@ -13,7 +13,10 @@ vi.mock("@dispatch/core", () => ({ yield { type: "text-delta", delta: "world" } as const; yield { type: "done", - message: { role: "assistant", content: "Hello world" }, + message: { + role: "assistant", + chunks: [{ type: "text", text: "Hello world" }], + }, } as const; yield { type: "status", status: "idle" } as const; } @@ -178,6 +181,16 @@ vi.mock("@dispatch/core", () => ({ return null; }, appendMessage() {}, + updateMessage() {}, + getMessagesForTab() { + return []; + }, + appendEventToChunks(_chunks: unknown[], _event: unknown) { + // no-op stub; chunk accumulation isn't exercised in these unit tests + }, + applySystemEvent(_messages: unknown[], _event: unknown) { + return { messageId: "mock-system-msg" }; + }, BackgroundShellStore: class MockBackgroundShellStore { has() { return false; diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index 69a6676..49bae49 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -14,7 +14,10 @@ vi.mock("@dispatch/core", () => ({ yield { type: "text-delta", delta: "world" } as const; yield { type: "done", - message: { role: "assistant", content: "Hello world" }, + message: { + role: "assistant", + chunks: [{ type: "text", text: "Hello world" }], + }, } as const; yield { type: "status", status: "idle" } as const; } @@ -179,6 +182,16 @@ vi.mock("@dispatch/core", () => ({ return null; }, appendMessage() {}, + updateMessage() {}, + getMessagesForTab() { + return []; + }, + appendEventToChunks(_chunks: unknown[], _event: unknown) { + // no-op stub + }, + applySystemEvent(_messages: unknown[], _event: unknown) { + return { messageId: "mock-system-msg" }; + }, BackgroundShellStore: class MockBackgroundShellStore { has() { return false; |
